From c30b075e13c3e862b6694e80acb5099c266502ac Mon Sep 17 00:00:00 2001 From: Renee Tso <8248583+rtso@users.noreply.github.com> Date: Mon, 15 Apr 2024 18:44:50 -0700 Subject: [PATCH] Cleanup post processor (#346) * Cleanup post processor * Update dockerfile --- rust/Cargo.lock | 20 --- rust/Cargo.toml | 15 +- rust/Dockerfile | 3 - rust/post-processor/Cargo.toml | 28 ---- rust/post-processor/config.yaml | 13 -- rust/post-processor/src/lib.rs | 5 - rust/post-processor/src/main.rs | 64 --------- rust/post-processor/src/metrics.rs | 38 ----- .../src/processor_status_checker.rs | 135 ------------------ 9 files changed, 7 insertions(+), 314 deletions(-) delete mode 100644 rust/post-processor/Cargo.toml delete mode 100644 rust/post-processor/config.yaml delete mode 100644 rust/post-processor/src/lib.rs delete mode 100644 rust/post-processor/src/main.rs delete mode 100644 rust/post-processor/src/metrics.rs delete mode 100644 rust/post-processor/src/processor_status_checker.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 8626a42b0..c583e7a4f 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1839,26 +1839,6 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" -[[package]] -name = "post-processor" -version = "1.0.0" -dependencies = [ - "ahash", - "anyhow", - "async-trait", - "chrono", - "clap", - "futures", - "once_cell", - "prometheus", - "reqwest", - "serde", - "serde_json", - "server-framework", - "tokio", - "tracing", -] - [[package]] name = "postgres-native-tls" version = "0.5.0" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 6ccc34140..40e9fdfc3 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -1,13 +1,7 @@ [workspace] resolver = "2" -members = [ - "indexer-metrics", - "moving-average", - "post-processor", - "processor", - "server-framework", -] +members = ["indexer-metrics", "moving-average", "processor", "server-framework"] [workspace.package] authors = ["Aptos Labs "] @@ -45,7 +39,12 @@ diesel = { version = "2.1", features = [ ] } # Use the crate version once this feature gets released on crates.io: # https://github.com/weiznich/diesel_async/commit/e165e8c96a6c540ebde2d6d7c52df5c5620a4bf1 -diesel-async = { git = "https://github.com/weiznich/diesel_async.git", rev = "d02798c67065d763154d7272dd0c09b39757d0f2", features = ["async-connection-wrapper", "postgres", "bb8", "tokio"] } +diesel-async = { git = "https://github.com/weiznich/diesel_async.git", rev = "d02798c67065d763154d7272dd0c09b39757d0f2", features = [ + "async-connection-wrapper", + "postgres", + "bb8", + "tokio", +] } diesel_migrations = { version = "2.1.0", features = ["postgres"] } diesel_async_migrations = { git = "https://github.com/niroco/diesel_async_migrations", rev = "11f331b73c5cfcc894380074f748d8fda710ac12" } enum_dispatch = "0.3.12" diff --git a/rust/Dockerfile b/rust/Dockerfile index bb293e9ba..481b2f489 100644 --- a/rust/Dockerfile +++ b/rust/Dockerfile @@ -11,8 +11,6 @@ COPY --link . /app RUN apt-get update && apt-get install -y cmake curl clang git pkg-config libssl-dev libpq-dev lld RUN cargo build --locked --release -p processor RUN cp target/release/processor /usr/local/bin -RUN cargo build --locked --release -p post-processor -RUN cp target/release/post-processor /usr/local/bin RUN cargo build --locked --release -p indexer-metrics RUN cp target/release/indexer-metrics /usr/local/bin @@ -29,7 +27,6 @@ ENV GIT_SHA ${GIT_SHA} FROM debian:bullseye-slim COPY --from=builder /usr/local/bin/processor /usr/local/bin -COPY --from=builder /usr/local/bin/post-processor /usr/local/bin COPY --from=builder /usr/local/bin/indexer-metrics /usr/local/bin RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \ diff --git a/rust/post-processor/Cargo.toml b/rust/post-processor/Cargo.toml deleted file mode 100644 index 364434a38..000000000 --- a/rust/post-processor/Cargo.toml +++ /dev/null @@ -1,28 +0,0 @@ -[package] -name = "post-processor" -version = "1.0.0" - -# Workspace inherited keys -authors = { workspace = true } -edition = { workspace = true } -homepage = { workspace = true } -license = { workspace = true } -publish = { workspace = true } -repository = { workspace = true } -rust-version = { workspace = true } - -[dependencies] -ahash = { workspace = true } -anyhow = { workspace = true } -async-trait = { workspace = true } -chrono = { workspace = true } -clap = { workspace = true } -futures = { workspace = true } -once_cell = { workspace = true } -prometheus = { workspace = true } -reqwest = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -server-framework = { workspace = true } -tokio = { workspace = true } -tracing = { workspace = true } diff --git a/rust/post-processor/config.yaml b/rust/post-processor/config.yaml deleted file mode 100644 index 44eee77e3..000000000 --- a/rust/post-processor/config.yaml +++ /dev/null @@ -1,13 +0,0 @@ -health_check_port: 8088 -server_config: - processor_status_checker_config: - # Endpoint is one of: - # - https://indexer.mainnet.aptoslabs.com/api/rest/get_lastest_processor_status - # - https://indexer-testnet.staging.gcp.aptosdev.com/api/rest/get_lastest_processor_status - # - https://indexer-devnet.staging.gcp.aptosdev.com/api/rest/get_lastest_processor_status - hasura_rest_api_endpoint: "https://indexer.mainnet.aptoslabs.com/api/rest/get_lastest_processor_status" - # fullnode_rest_api_endpoint is one of - # - https://fullnode.mainnet.aptoslabs.com/v1 - # - https://fullnode.testnet.aptoslabs.com/v1 - # - https://fullnode.devnet.aptoslabs.com/v1 - fullnode_rest_api_endpoint: "https://fullnode.mainnet.aptoslabs.com/v1" diff --git a/rust/post-processor/src/lib.rs b/rust/post-processor/src/lib.rs deleted file mode 100644 index e77961406..000000000 --- a/rust/post-processor/src/lib.rs +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -pub mod metrics; -pub mod processor_status_checker; diff --git a/rust/post-processor/src/main.rs b/rust/post-processor/src/main.rs deleted file mode 100644 index 2b9c1b8b0..000000000 --- a/rust/post-processor/src/main.rs +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use anyhow::Result; -use clap::Parser; -use post_processor::{ - metrics::TASK_FAILURE_COUNT, processor_status_checker::ProcessorStatusChecker, -}; -use serde::{Deserialize, Serialize}; -use server_framework::{RunnableConfig, ServerArgs}; -use tracing::info; - -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(deny_unknown_fields)] -pub struct ProcessorStatusCheckerConfig { - pub hasura_rest_api_endpoint: String, - pub fullnode_rest_api_endpoint: String, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(deny_unknown_fields)] -pub struct PostProcessorConfig { - pub processor_status_checker_config: Option, -} - -#[async_trait::async_trait] -impl RunnableConfig for PostProcessorConfig { - async fn run(&self) -> Result<()> { - let mut tasks = vec![]; - - if let Some(config) = &self.processor_status_checker_config { - tasks.push(tokio::spawn({ - let config = config.clone(); - async move { - let checker = ProcessorStatusChecker::new( - config.hasura_rest_api_endpoint.clone(), - config.fullnode_rest_api_endpoint.clone(), - ); - info!("Starting ProcessorStatusChecker"); - if let Err(err) = checker.run().await { - tracing::error!("ProcessorStatusChecker failed: {:?}", err); - TASK_FAILURE_COUNT - .with_label_values(&["processor_status_checker"]) - .inc(); - } - } - })) - } - - let _ = futures::future::join_all(tasks).await; - unreachable!("All tasks should run forever"); - } - - fn get_server_name(&self) -> String { - "idxbg".to_string() - } -} - -#[tokio::main] -async fn main() -> Result<()> { - let args = ServerArgs::parse(); - args.run::(tokio::runtime::Handle::current()) - .await -} diff --git a/rust/post-processor/src/metrics.rs b/rust/post-processor/src/metrics.rs deleted file mode 100644 index 5afa57d78..000000000 --- a/rust/post-processor/src/metrics.rs +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use once_cell::sync::Lazy; -use prometheus::{ - register_gauge_vec, register_int_counter_vec, register_int_gauge_vec, GaugeVec, IntCounterVec, - IntGaugeVec, -}; - -/// Task failure count. -pub static TASK_FAILURE_COUNT: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "indexer_processors_post_processing_task_failure_count", - "Task failure count.", - &["task_name"], - ) - .unwrap() -}); - -// API last update time latency to current time in seconds. -pub static HASURA_API_LAST_UPDATED_TIME_LATENCY_IN_SECS: Lazy = Lazy::new(|| { - register_gauge_vec!( - "indexer_processors_hasura_api_last_updated_time_latency_in_secs", - "Processor last update time latency to current time in seconds.", - &["processor_name"], - ) - .unwrap() -}); - -// Processor latest version latency to fullnode latest version. -pub static HASURA_API_LATEST_VERSION_LATENCY: Lazy = Lazy::new(|| { - register_int_gauge_vec!( - "indexer_processors_hasura_api_latest_version_latency", - "Processor latest version latency to fullnode latest version.", - &["processor_name"], - ) - .unwrap() -}); diff --git a/rust/post-processor/src/processor_status_checker.rs b/rust/post-processor/src/processor_status_checker.rs deleted file mode 100644 index 7a06666ad..000000000 --- a/rust/post-processor/src/processor_status_checker.rs +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::metrics::{ - HASURA_API_LAST_UPDATED_TIME_LATENCY_IN_SECS, HASURA_API_LATEST_VERSION_LATENCY, -}; -use ahash::AHashMap; -use anyhow::Result; -use chrono::NaiveDateTime; -use core::panic; -use serde::{Deserialize, Serialize}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use tracing::info; - -const PROCESSOR_STATUS_CHECKER_WAIT_TIME_IN_SECS: u64 = 10; - -pub struct ProcessorStatusChecker { - pub hasura_rest_api_endpoint: String, - pub fullnode_rest_api_endpoint: String, -} - -#[derive(Debug, Deserialize, Serialize)] -struct ProcessorStatusResponse { - processor_status: Vec, -} - -#[derive(Debug, Deserialize, Serialize)] -struct ProcessorStatus { - pub processor: String, - pub last_updated: String, - pub last_success_version: i64, -} - -#[derive(Debug, Deserialize, Serialize)] -struct FullnodeResponse { - chain_id: u8, - epoch: String, - ledger_version: String, - oldest_ledger_version: String, - ledger_timestamp: String, - node_role: String, - oldest_block_height: String, - block_height: String, - git_hash: String, -} - -impl ProcessorStatusChecker { - pub fn new(hasura_rest_api_endpoint: String, fullnode_rest_api_endpoint: String) -> Self { - Self { - hasura_rest_api_endpoint, - fullnode_rest_api_endpoint, - } - } - - pub async fn run(&self) -> Result<()> { - loop { - let processor_latest_version_map = handle_hasura_response( - self.hasura_rest_api_endpoint.clone(), - ) - .await - .unwrap_or_else(|e| { - tracing::error!(e = ?e, "Failed to get processor status response from hasura"); - panic!(); - }); - - let fullnode_latest_version = - handle_fullnode_api_response(self.fullnode_rest_api_endpoint.clone()) - .await - .unwrap_or_else(|e| { - tracing::error!(e = ?e, "Failed to get fullnode response from fullnode"); - panic!(); - }); - for processor_latest_version in processor_latest_version_map { - let latency = fullnode_latest_version - processor_latest_version.1; - HASURA_API_LATEST_VERSION_LATENCY - .with_label_values(&[processor_latest_version.0.as_str()]) - .set(latency); - } - tokio::time::sleep(Duration::from_secs( - PROCESSOR_STATUS_CHECKER_WAIT_TIME_IN_SECS, - )) - .await; - } - } -} - -async fn handle_hasura_response(hasura_endpoint: String) -> Result> { - let endpoint = hasura_endpoint.clone(); - info!("Connecting to hasura endpoint: {}", endpoint); - let client = reqwest::Client::new(); - let result = client.get(endpoint).send().await?; - let processor_status_response_result = result.json::().await; - let processor_status_response = match processor_status_response_result { - Ok(processor_status_response) => processor_status_response, - Err(e) => { - anyhow::bail!("Failed to handle hasura api response: {:?}", e); - }, - }; - - let mut processor_latest_version_map = AHashMap::new(); - - for processor_status in processor_status_response.processor_status { - let last_updated_time = NaiveDateTime::parse_from_str( - processor_status.last_updated.as_str(), - "%Y-%m-%dT%H:%M:%S%.f", - ) - .unwrap(); - let current_time = SystemTime::now(); - let latency = current_time.duration_since(UNIX_EPOCH)?.as_secs_f64() - - last_updated_time - .signed_duration_since(NaiveDateTime::from_timestamp_opt(0, 0).unwrap()) - .to_std()? - .as_secs_f64(); - HASURA_API_LAST_UPDATED_TIME_LATENCY_IN_SECS - .with_label_values(&[processor_status.processor.as_str()]) - .set(latency); - processor_latest_version_map.insert( - processor_status.processor, - processor_status.last_success_version, - ); - } - Ok(processor_latest_version_map) -} - -async fn handle_fullnode_api_response(fullnode_endpoint: String) -> Result { - let endpoint = fullnode_endpoint.clone(); - info!("Connecting to fullnode endpoint: {}", endpoint); - let client = reqwest::Client::new(); - let result = client.get(endpoint).send().await?; - let fullnode_response_result = result.json::().await; - match fullnode_response_result { - Ok(fullnode_response) => Ok(fullnode_response.ledger_version.parse::().unwrap()), - Err(e) => anyhow::bail!("Failed to handle fullnode api response: {:?}", e), - } -}