Skip to content

Commit

Permalink
[indexer grpc] server framework + PFN latency monitoring as baseline (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-aptos authored May 17, 2023
1 parent 755ee94 commit 286bf7b
Show file tree
Hide file tree
Showing 39 changed files with 1,005 additions and 597 deletions.
87 changes: 76 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ members = [
"ecosystem/indexer-grpc/indexer-grpc-file-store",
"ecosystem/indexer-grpc/indexer-grpc-fullnode",
"ecosystem/indexer-grpc/indexer-grpc-parser",
"ecosystem/indexer-grpc/indexer-grpc-post-processor",
"ecosystem/indexer-grpc/indexer-grpc-server-framework",
"ecosystem/indexer-grpc/indexer-grpc-utils",
"ecosystem/node-checker",
"ecosystem/node-checker/fn-check-client",
Expand Down Expand Up @@ -287,9 +289,11 @@ aptos-indexer = { path = "crates/indexer" }
aptos-indexer-grpc-cache-worker = { path = "ecosystem/indexer-grpc/indexer-grpc-cache-worker" }
aptos-indexer-grpc-data-service = { path = "ecosystem/indexer-grpc/indexer-grpc-data-service" }
aptos-indexer-grpc-file-store = { path = "ecosystem/indexer-grpc/indexer-grpc-file-store" }
aptos-indexer-grpc-post-processor = { path = "ecosystem/indexer-grpc/indexer-grpc-post-processor" }
aptos-indexer-grpc-fullnode = { path = "ecosystem/indexer-grpc/indexer-grpc-fullnode" }
aptos-indexer-grpc-utils = { path = "ecosystem/indexer-grpc/indexer-grpc-utils" }
aptos-indexer-grpc-parser = { path = "ecosystem/indexer-grpc/indexer-grpc-parser" }
aptos-indexer-grpc-server-framework = { path = "ecosystem/indexer-grpc/indexer-grpc-server-framework" }
aptos-infallible = { path = "crates/aptos-infallible" }
aptos-inspection-service = { path = "crates/aptos-inspection-service" }
aptos-jellyfish-merkle = { path = "storage/jellyfish-merkle" }
Expand Down Expand Up @@ -547,7 +551,7 @@ thiserror = "1.0.37"
tiny-bip39 = "0.8.2"
tiny-keccak = { version = "2.0.2", features = ["keccak", "sha3"] }
tracing = "0.1.34"
tracing-subscriber = "0.3.11"
tracing-subscriber = { version = "0.3.11", features = ["json", "env-filter"] }
trybuild = "1.0.41"
tokio = { version = "1.21.0", features = ["full"] }
tokio-io-timeout = "1.2.0"
Expand Down
5 changes: 2 additions & 3 deletions ecosystem/indexer-grpc/indexer-grpc-cache-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ rust-version = { workspace = true }

[dependencies]
anyhow = { workspace = true }
aptos-crash-handler = { workspace = true }
aptos-indexer-grpc-server-framework = { workspace = true }
aptos-indexer-grpc-utils = { workspace = true }
aptos-logger = { workspace = true }
aptos-metrics-core = { workspace = true }
aptos-moving-average = { workspace = true }
aptos-protos = { workspace = true }
Expand All @@ -30,9 +29,9 @@ futures-core = { workspace = true }
once_cell = { workspace = true }
prost = { workspace = true }
redis = { workspace = true }
redis-test = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }
71 changes: 31 additions & 40 deletions ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,42 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use anyhow::{Ok, Result};
use aptos_indexer_grpc_cache_worker::worker::Worker;
use aptos_indexer_grpc_utils::register_probes_and_metrics_handler;
use aptos_indexer_grpc_server_framework::{RunnableConfig, ServerArgs};
use aptos_indexer_grpc_utils::config::IndexerGrpcFileStoreConfig;
use clap::Parser;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
};

#[derive(Parser)]
pub struct Args {
#[clap(short, long)]
pub config_path: String,
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerGrpcCacheWorkerConfig {
pub server_name: String,
pub fullnode_grpc_address: String,
pub file_store_config: IndexerGrpcFileStoreConfig,
pub redis_main_instance_address: String,
}

fn main() {
aptos_logger::Logger::new().init();
aptos_crash_handler::setup_panic_handler();

// Load config.
let args = Args::parse();
let config = aptos_indexer_grpc_utils::config::IndexerGrpcConfig::load(
std::path::PathBuf::from(args.config_path),
)
.unwrap();

let health_port = config.health_check_port;

let runtime = aptos_runtimes::spawn_named_runtime("indexercache".to_string(), None);

// Start processing.
runtime.spawn(async move {
let mut worker = Worker::new(config).await;
#[async_trait::async_trait]
impl RunnableConfig for IndexerGrpcCacheWorkerConfig {
async fn run(&self) -> Result<()> {
let mut worker = Worker::new(
self.fullnode_grpc_address.clone(),
self.redis_main_instance_address.clone(),
self.file_store_config.clone(),
)
.await;
worker.run().await;
});

// Start liveness and readiness probes.
runtime.spawn(async move {
register_probes_and_metrics_handler(health_port).await;
});
Ok(())
}

let term = Arc::new(AtomicBool::new(false));
while !term.load(Ordering::Acquire) {
thread::park();
fn get_server_name(&self) -> String {
self.server_name.clone()
}
}

#[tokio::main]
async fn main() -> Result<()> {
let args = ServerArgs::parse();
args.run::<IndexerGrpcCacheWorkerConfig>().await
}
Loading

0 comments on commit 286bf7b

Please sign in to comment.