Skip to content

Commit

Permalink
[EI-444] Migrate events processor to sdk (#470)
Browse files Browse the repository at this point in the history
* setup

* import events processor

* make it work

* comments

* use db config enum

* comments

* make it work with ending_version

* cargo

* dockerfile

* comments

* lint

* bump sdk

* chain id check

* logging

* bump version that fixes pollable async step

* custom channel size

* bump sdk
  • Loading branch information
rtso authored Aug 14, 2024
1 parent 89844fb commit d1b76cf
Show file tree
Hide file tree
Showing 32 changed files with 3,519 additions and 76 deletions.
902 changes: 830 additions & 72 deletions rust/Cargo.lock

Large diffs are not rendered by default.

19 changes: 16 additions & 3 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
[workspace]
resolver = "2"

members = ["indexer-metrics", "moving-average", "processor", "server-framework"]
members = [
"indexer-metrics",
"moving-average",
"processor",
"sdk-processor",
"server-framework",
]

[workspace.package]
authors = ["Aptos Labs <[email protected]>"]
Expand All @@ -16,9 +22,12 @@ rust-version = "1.75"
processor = { path = "processor" }
server-framework = { path = "server-framework" }
aptos-moving-average = { path = "moving-average" }
sdk-processor = { path = "sdk-processor" }

ahash = { version = "0.8.7", features = ["serde"] }
anyhow = "1.0.62"
anyhow = "1.0.86"
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "e1e1bdd9349f0a68c9fc53b7e2cebda9e2ce92b7" }
aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb" }
aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "4541add3fd29826ec57f22658ca286d2d6134b93" }
async-trait = "0.1.53"
Expand Down Expand Up @@ -73,6 +82,7 @@ pbjson = "0.5.1"
prometheus = { version = "0.13.0", default-features = false }
prost = { version = "0.12.3", features = ["no-recursion-limit"] }
prost-types = "0.12.3"
rayon = "1.10.0"
regex = "1.5.5"
reqwest = { version = "0.11.20", features = [
"blocking",
Expand Down Expand Up @@ -111,7 +121,10 @@ postgres-native-tls = "0.5.0"
tokio-postgres = "0.7.10"

# Parquet support
parquet = { version = "52.0.0", default-features = false, features = ["async", "lz4"] }
parquet = { version = "52.0.0", default-features = false, features = [
"async",
"lz4",
] }
num = "0.4.0"
google-cloud-storage = "0.13.0"
hyper = { version = "0.14.18", features = ["full"] }
Expand Down
3 changes: 3 additions & 0 deletions rust/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ RUN cargo build --locked --release -p processor
RUN cp target/release/processor /usr/local/bin
RUN cargo build --locked --release -p indexer-metrics
RUN cp target/release/indexer-metrics /usr/local/bin
RUN cargo build --locked --release -p sdk-processor
RUN cp target/release/sdk-processor /usr/local/bin

# add build info
ARG GIT_TAG
Expand All @@ -29,6 +31,7 @@ FROM debian:bullseye-slim

COPY --from=builder /usr/local/bin/processor /usr/local/bin
COPY --from=builder /usr/local/bin/indexer-metrics /usr/local/bin
COPY --from=builder /usr/local/bin/sdk-processor /usr/local/bin

RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
Expand Down
1 change: 1 addition & 0 deletions rust/processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ ahash = { workspace = true }
allocative = { workspace = true }
allocative_derive = { workspace = true }
anyhow = { workspace = true }
aptos-indexer-processor-sdk = { workspace = true }
aptos-moving-average = { workspace = true }
aptos-protos = { workspace = true }
async-trait = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion rust/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use config::IndexerGrpcProcessorConfig;

pub mod bq_analytics;
mod config;
mod db;
pub mod db;
pub mod gap_detectors;
pub mod grpc_stream;
pub mod processors;
Expand Down
49 changes: 49 additions & 0 deletions rust/sdk-processor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
[package]
name = "sdk-processor"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ahash = { workspace = true }
anyhow = { workspace = true }
aptos-indexer-processor-sdk = { workspace = true }
aptos-indexer-processor-sdk-server-framework = { workspace = true }
async-trait = { workspace = true }
bcs = { workspace = true }
bigdecimal = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
diesel = { workspace = true }
diesel-async = { workspace = true }
diesel_migrations = { workspace = true }
field_count = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
hex = { workspace = true }
jemallocator = { workspace = true }
kanal = { workspace = true }
lazy_static = { workspace = true }
num_cpus = { workspace = true }
processor = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sha2 = { workspace = true }
strum = { workspace = true }
tiny-keccak = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }

# Postgres SSL support
native-tls = { workspace = true }
postgres-native-tls = { workspace = true }
tokio-postgres = { workspace = true }

[features]
libpq = ["diesel/postgres"]
# When using the default features we enable the diesel/postgres feature. We configure
# it in a feature so the CLI can opt out, since it cannot tolerate the libpq dep.
# Recall that features should always be additive.
default = ["libpq"]
50 changes: 50 additions & 0 deletions rust/sdk-processor/src/config/db_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use serde::{Deserialize, Serialize};

/// This enum captures the configs for all the different db storages that are defined.
/// The configs for each db storage should only contain configuration specific to that
/// type.
#[derive(Clone, Debug, Deserialize, Serialize, strum::IntoStaticStr, strum::EnumDiscriminants)]
#[serde(tag = "type", rename_all = "snake_case")]
// What is all this strum stuff? Let me explain.
//
// Previously we had consts called NAME in each module and a function called `name` on
// the ProcessorTrait. As such it was possible for this name to not match the snake case
// representation of the struct name. By using strum we can have a single source for
// processor names derived from the enum variants themselves.
//
// That's what this strum_discriminants stuff is, it uses macro magic to generate the
// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this
// generation logic, e.g. to make sure we use snake_case.
#[strum(serialize_all = "snake_case")]
#[strum_discriminants(
derive(
Deserialize,
Serialize,
strum::EnumVariantNames,
strum::IntoStaticStr,
strum::Display,
clap::ValueEnum
),
name(DbTypeName),
clap(rename_all = "snake_case"),
serde(rename_all = "snake_case"),
strum(serialize_all = "snake_case")
)]
pub enum DbConfig {
PostgresConfig(PostgresConfig),
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct PostgresConfig {
pub connection_string: String,
// Size of the pool for writes/reads to the DB. Limits maximum number of queries in flight
#[serde(default = "PostgresConfig::default_db_pool_size")]
pub db_pool_size: u32,
}

impl PostgresConfig {
pub const fn default_db_pool_size() -> u32 {
150
}
}
40 changes: 40 additions & 0 deletions rust/sdk-processor/src/config/indexer_processor_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::{db_config::DbConfig, processor_config::ProcessorConfig};
use crate::processors::events_processor::EventsProcessor;
use anyhow::Result;
use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig;
use aptos_indexer_processor_sdk_server_framework::RunnableConfig;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerProcessorConfig {
pub processor_config: ProcessorConfig,
pub transaction_stream_config: TransactionStreamConfig,
pub db_config: DbConfig,
}

#[async_trait::async_trait]
impl RunnableConfig for IndexerProcessorConfig {
async fn run(&self) -> Result<()> {
match self.processor_config {
ProcessorConfig::EventsProcessor(_) => {
let events_processor = EventsProcessor::new(self.clone()).await?;
events_processor.run_processor().await
},
}
}

fn get_server_name(&self) -> String {
// Get the part before the first _ and trim to 12 characters.
let before_underscore = self
.processor_config
.name()
.split('_')
.next()
.unwrap_or("unknown");
before_underscore[..before_underscore.len().min(12)].to_string()
}
}
3 changes: 3 additions & 0 deletions rust/sdk-processor/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod db_config;
pub mod indexer_processor_config;
pub mod processor_config;
75 changes: 75 additions & 0 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use crate::processors::events_processor::EventsProcessorConfig;
use serde::{Deserialize, Serialize};

/// This enum captures the configs for all the different processors that are defined.
/// The configs for each processor should only contain configuration specific to that
/// processor. For configuration that is common to all processors, put it in
/// IndexerGrpcProcessorConfig.
#[derive(Clone, Debug, Deserialize, Serialize, strum::IntoStaticStr, strum::EnumDiscriminants)]
#[serde(tag = "type", rename_all = "snake_case")]
// What is all this strum stuff? Let me explain.
//
// Previously we had consts called NAME in each module and a function called `name` on
// the ProcessorTrait. As such it was possible for this name to not match the snake case
// representation of the struct name. By using strum we can have a single source for
// processor names derived from the enum variants themselves.
//
// That's what this strum_discriminants stuff is, it uses macro magic to generate the
// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this
// generation logic, e.g. to make sure we use snake_case.
#[strum(serialize_all = "snake_case")]
#[strum_discriminants(
derive(
Deserialize,
Serialize,
strum::EnumVariantNames,
strum::IntoStaticStr,
strum::Display,
clap::ValueEnum
),
name(ProcessorName),
clap(rename_all = "snake_case"),
serde(rename_all = "snake_case"),
strum(serialize_all = "snake_case")
)]
pub enum ProcessorConfig {
EventsProcessor(EventsProcessorConfig),
}

impl ProcessorConfig {
/// Get the name of the processor config as a static str. This is a convenience
/// method to access the derived functionality implemented by strum::IntoStaticStr.
pub fn name(&self) -> &'static str {
self.into()
}
}
#[derive(Debug)]
// To ensure that the variants of ProcessorConfig and Processor line up, in the testing
// build path we derive EnumDiscriminants on this enum as well and make sure the two
// sets of variants match up in `test_processor_names_complete`.
#[cfg_attr(
test,
derive(strum::EnumDiscriminants),
strum_discriminants(
derive(strum::EnumVariantNames),
name(ProcessorDiscriminants),
strum(serialize_all = "snake_case")
)
)]
pub enum Processor {
EventsProcessor,
}

#[cfg(test)]
mod test {
use super::*;
use strum::VariantNames;

/// This test exists to make sure that when a new processor is added, it is added
/// to both Processor and ProcessorConfig. To make sure this passes, make sure the
/// variants are in the same order (lexicographical) and the names match.
#[test]
fn test_processor_names_complete() {
assert_eq!(ProcessorName::VARIANTS, ProcessorDiscriminants::VARIANTS);
}
}
1 change: 1 addition & 0 deletions rust/sdk-processor/src/db/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod models;
79 changes: 79 additions & 0 deletions rust/sdk-processor/src/db/common/models/events_models/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

#![allow(clippy::extra_unused_lifetimes)]

// Copied from processor crate. The only difference is the protos are imported from the SDK
// instead of the aptos-protos crate.
use aptos_indexer_processor_sdk::aptos_protos::transaction::v1::Event as EventPB;
use diesel::{Identifiable, Insertable};
use field_count::FieldCount;
use processor::{
schema::events,
utils::util::{standardize_address, truncate_str},
};
use serde::{Deserialize, Serialize};

// p99 currently is 303 so using 300 as a safe max length
const EVENT_TYPE_MAX_LENGTH: usize = 300;

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(transaction_version, event_index))]
#[diesel(table_name = events)]
pub struct Event {
pub sequence_number: i64,
pub creation_number: i64,
pub account_address: String,
pub transaction_version: i64,
pub transaction_block_height: i64,
pub type_: String,
pub data: serde_json::Value,
pub event_index: i64,
pub indexed_type: String,
}

impl Event {
pub fn from_event(
event: &EventPB,
transaction_version: i64,
transaction_block_height: i64,
event_index: i64,
) -> Self {
let t: &str = event.type_str.as_ref();
Event {
account_address: standardize_address(
event.key.as_ref().unwrap().account_address.as_str(),
),
creation_number: event.key.as_ref().unwrap().creation_number as i64,
sequence_number: event.sequence_number as i64,
transaction_version,
transaction_block_height,
type_: t.to_string(),
data: serde_json::from_str(event.data.as_str()).unwrap(),
event_index,
indexed_type: truncate_str(t, EVENT_TYPE_MAX_LENGTH),
}
}

pub fn from_events(
events: &[EventPB],
transaction_version: i64,
transaction_block_height: i64,
) -> Vec<Self> {
events
.iter()
.enumerate()
.map(|(index, event)| {
Self::from_event(
event,
transaction_version,
transaction_block_height,
index as i64,
)
})
.collect::<Vec<EventModel>>()
}
}

// Prevent conflicts with other things named `Event`
pub type EventModel = Event;
4 changes: 4 additions & 0 deletions rust/sdk-processor/src/db/common/models/events_models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod events;
2 changes: 2 additions & 0 deletions rust/sdk-processor/src/db/common/models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod events_models;
pub mod processor_status;
Loading

0 comments on commit d1b76cf

Please sign in to comment.