diff --git a/Cargo.lock b/Cargo.lock index 5c7cd080d..f6deb8d3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2939,6 +2939,7 @@ dependencies = [ "bincode", "clap 3.2.25", "cynic", + "forc-postgres", "fuel-core", "fuel-core-client", "fuel-crypto 0.26.3", @@ -2951,9 +2952,11 @@ dependencies = [ "fuel-vm", "futures", "itertools 0.10.5", + "openssl", "sqlx", "thiserror", "tokio 1.28.2", + "tokio-util 0.7.8", "tracing", "wasmer", "wasmer-compiler-cranelift", @@ -5027,6 +5030,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-src" +version = "111.25.3+1.1.1t" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924757a6a226bf60da5f7dd0311a34d2b52283dd82ddeb103208ddc66362f80c" +dependencies = [ + "cc", +] + [[package]] name = "openssl-sys" version = "0.9.88" @@ -5035,6 +5047,7 @@ checksum = "c2ce0f250f34a308dcfdbb351f511359857d4ed2134ba715a4eadd46e1ffd617" dependencies = [ "cc", "libc", + "openssl-src", "pkg-config", "vcpkg", ] diff --git a/Cargo.toml b/Cargo.toml index fa5947425..eeb9891af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ members = [ "packages/fuel-indexer-tests/components/indices/simple-wasm/simple-wasm", "packages/fuel-indexer-tests/components/web-api", "packages/fuel-indexer-types", + "packages/fuel-indexer", "packages/fuel-indexer-utils", "plugins/forc-index", "plugins/forc-index-tests", @@ -88,4 +89,5 @@ serde = { version = "1.0", default-features = false, features = ["derive"] } serde_json = { version = "1.0", default-features = false } thiserror = "1.0" tokio = "1.17" +tokio-util = "0.7.8" tracing = "0.1" diff --git a/packages/fuel-indexer/Cargo.toml b/packages/fuel-indexer/Cargo.toml index cdbab142c..64d70dc13 100644 --- a/packages/fuel-indexer/Cargo.toml +++ b/packages/fuel-indexer/Cargo.toml @@ -20,6 +20,7 @@ async-trait = "0.1" bincode = { workspace = true } clap = { features = ["cargo", "derive", "env"], workspace = true } cynic = "2.2" +forc-postgres = { workspace = true } fuel-core = { version = "0.17", optional = true } fuel-core-client = "0.17.12" fuel-crypto = { version = "0.26" } @@ -35,11 +36,16 @@ itertools = "0.10" sqlx = { version = "0.6", features = ["bigdecimal"] } thiserror = { workspace = true } tokio = { features = ["macros", "rt-multi-thread", "sync", "process"], workspace = true } +tokio-util = { workspace = true } tracing = { workspace = true } wasmer = "2.3" wasmer-compiler-cranelift = { version = "2.3" } wasmer-engine-universal = "2.3" +[dependencies.openssl] +version = "0.10.52" +features = ["vendored"] + [dev-dependencies] fuel-core-client = { version = "0.17.2", features = ["test-helpers"] } diff --git a/packages/fuel-indexer/src/commands/run.rs b/packages/fuel-indexer/src/commands/run.rs index dd80763ef..ce16339ea 100644 --- a/packages/fuel-indexer/src/commands/run.rs +++ b/packages/fuel-indexer/src/commands/run.rs @@ -2,7 +2,7 @@ use crate::IndexerService; use fuel_indexer_database::{queries, IndexerConnectionPool}; use fuel_indexer_lib::{ config::{IndexerArgs, IndexerConfig}, - defaults::SERVICE_REQUEST_CHANNEL_SIZE, + defaults, manifest::Manifest, utils::{init_logging, ServiceRequest}, }; @@ -13,11 +13,49 @@ use tracing::info; use fuel_indexer_api_server::api::GraphQlApi; pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> { - let IndexerArgs { manifest, .. } = args.clone(); + let IndexerArgs { + manifest, + embedded_database, + postgres_database, + postgres_password, + postgres_port, + postgres_user, + .. + } = args.clone(); + + let args_config = args.config.clone(); + + if embedded_database { + let name = postgres_database + .clone() + .unwrap_or(defaults::POSTGRES_DATABASE.to_string()); + let password = postgres_password + .clone() + .unwrap_or(defaults::POSTGRES_PASSWORD.to_string()); + let user = postgres_user + .clone() + .unwrap_or(defaults::POSTGRES_USER.to_string()); + let port = postgres_port + .clone() + .unwrap_or(defaults::POSTGRES_PORT.to_string()); + + let create_db_cmd = forc_postgres::cli::CreateDbCommand { + name, + password, + user, + port, + persistent: true, + config: args_config.clone(), + start: true, + ..Default::default() + }; + + forc_postgres::commands::create::exec(create_db_cmd).await?; + } let config = args - .clone() .config + .clone() .map(IndexerConfig::from_file) .unwrap_or(Ok(IndexerConfig::from(args)))?; @@ -26,7 +64,7 @@ pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> { info!("Configuration: {:?}", config); #[allow(unused)] - let (tx, rx) = channel::(SERVICE_REQUEST_CHANNEL_SIZE); + let (tx, rx) = channel::(defaults::SERVICE_REQUEST_CHANNEL_SIZE); let pool = IndexerConnectionPool::connect(&config.database.to_string()).await?; @@ -54,56 +92,79 @@ pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> { let service_handle = tokio::spawn(service.run()); + // for graceful shutdown + let cancel_token = tokio_util::sync::CancellationToken::new(); + #[cfg(feature = "api-server")] - { - let gql_handle = - tokio::spawn(GraphQlApi::build_and_run(config.clone(), pool, tx)); - - #[cfg(feature = "fuel-core-lib")] - { - use fuel_core::service::{Config, FuelService}; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - - if config.local_fuel_node { - let config = Config { - addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 4000), - ..Config::local_node() - }; - let node_handle = tokio::spawn(FuelService::new_node(config)); - - let _ = tokio::join!(service_handle, node_handle, gql_handle); - - return Ok(()); - } - } + let gql_handle = tokio::spawn(GraphQlApi::build_and_run(config.clone(), pool, tx)); - let _ = tokio::join!(service_handle, gql_handle); + #[cfg(not(feature = "api-server"))] + let gql_handle = tokio::spawn(futures::future::ready(())); + + #[cfg(feature = "fuel-core-lib")] + let node_handle = { + use fuel_core::service::{Config, FuelService}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + if config.local_fuel_node { + let config = Config { + addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 4000), + ..Config::local_node() + }; + tokio::spawn(async move { + let node = FuelService::new_node(config).await.unwrap(); + Some(node) + }) + } else { + tokio::spawn(futures::future::ready(None)) + } + }; - Ok(()) - } + #[cfg(not(feature = "fuel-core-lib"))] + let node_handle = tokio::spawn(futures::future::ready(())); - #[cfg(not(feature = "api-server"))] - { - #[cfg(feature = "fuel-core-lib")] - { - use fuel_core::service::{Config, FuelService}; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - - if config.local_fuel_node { - let config = Config { - addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 4000), - ..Config::local_node() - }; - let node_handle = tokio::spawn(FuelService::new_node(config)); - - let _ = tokio::join!(service_handle, node_handle); - - return Ok(()); - } + // spawn application as separate task + tokio::spawn({ + let cancel_token = cancel_token.clone(); + async move { + let _ = tokio::join!(service_handle, node_handle, gql_handle); + cancel_token.cancel(); } + }); - let _ = service_handle.await?; + use tokio::signal::unix::{signal, Signal, SignalKind}; - Ok(()) + let mut sighup: Signal = signal(SignalKind::hangup())?; + let mut sigterm: Signal = signal(SignalKind::terminate())?; + let mut sigint: Signal = signal(SignalKind::interrupt())?; + + tokio::select! { + _ = sighup.recv() => { + info!("Received SIGHUP. Stopping services"); + } + _ = sigterm.recv() => { + info!("Received SIGTERM. Stopping services"); + } + _ = sigint.recv() => { + info!("Received SIGINT. Stopping services"); + } + _ = cancel_token.cancelled() => { + info!("Received cancellation. Stopping services"); + } } + + if embedded_database { + let name = postgres_database.unwrap_or(defaults::POSTGRES_DATABASE.to_string()); + + let stop_db_cmd = forc_postgres::cli::StopDbCommand { + name, + config: args_config.clone(), + database_dir: None, + verbose: false, + }; + + forc_postgres::commands::stop::exec(stop_db_cmd).await?; + }; + + Ok(()) } diff --git a/packages/fuel-indexer/src/lib.rs b/packages/fuel-indexer/src/lib.rs index aef7f14fc..6a0135a22 100644 --- a/packages/fuel-indexer/src/lib.rs +++ b/packages/fuel-indexer/src/lib.rs @@ -19,6 +19,9 @@ pub use service::IndexerService; use thiserror::Error; use wasmer::{ExportError, HostEnvInitError, InstantiationError, RuntimeError}; +// required for vendored openssl +use openssl as _; + pub mod prelude { pub use super::{ Database, Executor, FtColumn, IndexEnv, IndexerConfig, IndexerError, diff --git a/plugins/forc-index/Cargo.toml b/plugins/forc-index/Cargo.toml index d9a9116c7..298b79a95 100644 --- a/plugins/forc-index/Cargo.toml +++ b/plugins/forc-index/Cargo.toml @@ -13,14 +13,14 @@ description = "Fuel Indexer forc plugin" actix-web = { version = "4", default-features = false, features = ["macros"] } anyhow = "1" -clap = { features = ["derive", "env"] , workspace = true } +clap = { features = ["derive", "env"], workspace = true } forc-postgres = { workspace = true } forc-tracing = { version = "0.31", default-features = false } forc-util = { version = "0.35.0" } -fuel-indexer-lib = { workspace = true } fuel-indexer-database-types = { workspace = true } -fuel-tx = { features = ["builder"] , workspace = true } -fuels = { workspace = true } +fuel-indexer-lib = { workspace = true } +fuel-tx = { features = ["builder"], workspace = true } +fuels = { default-features = false, workspace = true } hex = "0.4.3" hyper-rustls = { version = "0.23", features = ["http2"] } indicatif = "0.17" @@ -30,9 +30,9 @@ reqwest = { version = "0.11", default-features = false, features = ["json", "rus serde = { workspace = true } serde_json = { workspace = true } serde_yaml = "0.8" -tokio = { features = ["macros", "rt-multi-thread", "process"] , workspace = true } -toml = "0.5" tempfile = "3.4.0" +tokio = { features = ["macros", "rt-multi-thread", "process"], workspace = true } +toml = "0.5" tracing = { workspace = true } walkdir = "2" diff --git a/plugins/forc-index/src/ops/forc_index_start.rs b/plugins/forc-index/src/ops/forc_index_start.rs index d870fe18b..fe9b01337 100644 --- a/plugins/forc-index/src/ops/forc_index_start.rs +++ b/plugins/forc-index/src/ops/forc_index_start.rs @@ -1,6 +1,4 @@ use crate::cli::StartCommand; -use forc_postgres::cli::CreateDbCommand; -use fuel_indexer_lib::defaults; use std::process::Command; use tracing::info; @@ -32,35 +30,6 @@ pub async fn init(command: StartCommand) -> anyhow::Result<()> { .. } = command; - if embedded_database { - let name = postgres_database - .clone() - .unwrap_or(defaults::POSTGRES_DATABASE.to_string()); - let password = postgres_password - .clone() - .unwrap_or(defaults::POSTGRES_PASSWORD.to_string()); - let user = postgres_user - .clone() - .unwrap_or(defaults::POSTGRES_USER.to_string()); - - let port = postgres_port - .clone() - .unwrap_or(defaults::POSTGRES_PORT.to_string()); - - let create_db_cmd = CreateDbCommand { - name, - password, - user, - port, - persistent: true, - config: config.clone(), - start: true, - ..Default::default() - }; - - forc_postgres::commands::create::exec(Box::new(create_db_cmd)).await?; - } - let mut cmd = Command::new("fuel-indexer"); cmd.arg("run"); @@ -107,6 +76,10 @@ pub async fn init(command: StartCommand) -> anyhow::Result<()> { match database.as_ref() { "postgres" => { + if embedded_database { + cmd.arg("--embedded-database"); + } + // Postgres optional values let postgres_optionals = vec![ ("--postgres-user", postgres_user), diff --git a/plugins/forc-postgres/Cargo.toml b/plugins/forc-postgres/Cargo.toml index 0b40b8a05..ab627044b 100644 --- a/plugins/forc-postgres/Cargo.toml +++ b/plugins/forc-postgres/Cargo.toml @@ -19,13 +19,13 @@ path = "src/lib.rs" [dependencies] anyhow = "1" -clap = { features = ["derive", "env"] , workspace = true } +clap = { features = ["derive", "env"], workspace = true } forc-tracing = { version = "0.31", default-features = false } fuel-indexer-lib = { workspace = true } home = "0.5" indicatif = "0.17" pg-embed = { version = "0.7" } -serde = { features = ["derive"] , workspace = true } +serde = { features = ["derive"], workspace = true } serde_json = { workspace = true } -tokio = { features = ["macros", "rt-multi-thread", "process"] , workspace = true } +tokio = { features = ["macros", "rt-multi-thread", "process"], workspace = true } tracing = { workspace = true } diff --git a/plugins/forc-postgres/src/cli.rs b/plugins/forc-postgres/src/cli.rs index add180ab4..148c82d56 100644 --- a/plugins/forc-postgres/src/cli.rs +++ b/plugins/forc-postgres/src/cli.rs @@ -15,7 +15,7 @@ pub struct Opt { #[derive(Subcommand, Debug)] pub enum ForcPostgres { - Create(Box), + Create(CreateDbCommand), Drop(DropDbCommand), Start(StartDbCommand), Stop(StopDbCommand), diff --git a/plugins/forc-postgres/src/commands/create.rs b/plugins/forc-postgres/src/commands/create.rs index cf2db9513..33300faaa 100644 --- a/plugins/forc-postgres/src/commands/create.rs +++ b/plugins/forc-postgres/src/commands/create.rs @@ -89,7 +89,7 @@ impl Default for Command { } } -pub async fn exec(command: Box) -> Result<()> { +pub async fn exec(command: Command) -> Result<()> { let Command { name, user, @@ -104,7 +104,7 @@ pub async fn exec(command: Box) -> Result<()> { start, config, verbose, - } = *command; + } = command; let database_dir = db_dir_or_default(database_dir.as_ref(), &name); @@ -124,5 +124,6 @@ pub async fn exec(command: Box) -> Result<()> { verbose, }) .await?; + Ok(()) } diff --git a/plugins/forc-postgres/src/commands/start.rs b/plugins/forc-postgres/src/commands/start.rs index 1bebb3208..76feee4ac 100644 --- a/plugins/forc-postgres/src/commands/start.rs +++ b/plugins/forc-postgres/src/commands/start.rs @@ -40,5 +40,6 @@ pub async fn exec(command: Command) -> Result<()> { verbose, }) .await?; + Ok(()) } diff --git a/plugins/forc-postgres/src/ops/forc_postgres_createdb.rs b/plugins/forc-postgres/src/ops/forc_postgres_createdb.rs index 2b9c44531..ac3b85c45 100644 --- a/plugins/forc-postgres/src/ops/forc_postgres_createdb.rs +++ b/plugins/forc-postgres/src/ops/forc_postgres_createdb.rs @@ -11,7 +11,7 @@ use pg_embed::{pg_fetch::PgFetchSettings, postgres::PgEmbed}; use std::{fs::File, io::Write, path::PathBuf, time::Duration}; use tracing::info; -fn save_pgembed_config(config: PgEmbedConfig, path: Option<&PathBuf>) -> Result<()> { +fn save_pgembed_config(config: &PgEmbedConfig, path: Option<&PathBuf>) -> Result<()> { if let Some(path) = path { let filename = db_config_file_name(&config.name); let path = path.join(filename); @@ -141,7 +141,7 @@ pub async fn init(command: CreateDbCommand) -> anyhow::Result<()> { == format!("database \"{name}\" already exists") { info!("Database {} already exists", &name); - save_pgembed_config(pg_config, database_dir.as_ref())?; + save_pgembed_config(&pg_config, database_dir.as_ref())?; pb.finish(); if start { @@ -160,7 +160,7 @@ pub async fn init(command: CreateDbCommand) -> anyhow::Result<()> { pg.migrate(&name).await?; } - save_pgembed_config(pg_config, database_dir.as_ref())?; + save_pgembed_config(&pg_config, database_dir.as_ref())?; pb.finish(); diff --git a/plugins/forc-postgres/src/ops/forc_postgres_startdb.rs b/plugins/forc-postgres/src/ops/forc_postgres_startdb.rs index e6265ddff..48bf73f7d 100644 --- a/plugins/forc-postgres/src/ops/forc_postgres_startdb.rs +++ b/plugins/forc-postgres/src/ops/forc_postgres_startdb.rs @@ -1,6 +1,5 @@ use crate::{cli::StartDbCommand, pg::PgEmbedConfig}; use pg_embed::{pg_fetch::PgFetchSettings, postgres::PgEmbed}; -use std::mem::ManuallyDrop; use tracing::info; pub async fn init(command: StartDbCommand) -> anyhow::Result<()> { @@ -12,20 +11,21 @@ pub async fn init(command: StartDbCommand) -> anyhow::Result<()> { .. } = command; - let pg_config = - PgEmbedConfig::from_file(database_dir.as_ref(), config.as_ref(), &name)?; + let mut pg = { + let pg_config = + PgEmbedConfig::from_file(database_dir.as_ref(), config.as_ref(), &name)?; - let version = pg_config.postgres_version.clone(); + let version = pg_config.postgres_version.clone(); - let fetch_settings = PgFetchSettings { - version: version.clone().into(), - ..Default::default() + let fetch_settings = PgFetchSettings { + version: version.clone().into(), + ..Default::default() + }; + let pg = PgEmbed::new(pg_config.clone().into(), fetch_settings).await?; + // Disabling Drop trait behavior as PgEmbed shuts down when going out of scope + std::mem::ManuallyDrop::new(pg) }; - // Disabling Drop trait behavior as PgEmbed shuts down when going out of scope - let mut pg = - ManuallyDrop::new(PgEmbed::new(pg_config.clone().into(), fetch_settings).await?); - info!("\nStarting PostgreSQL...\n"); pg.start_db().await?; diff --git a/plugins/forc-postgres/src/pg.rs b/plugins/forc-postgres/src/pg.rs index 2470a25f8..8f0e4f384 100644 --- a/plugins/forc-postgres/src/pg.rs +++ b/plugins/forc-postgres/src/pg.rs @@ -55,8 +55,9 @@ impl PgEmbedConfig { None => { let filename = db_config_file_name(name); let path = database_dir.join(filename); - let mut file = - File::open(path).expect("PgEmbedConfig file does not exist."); + let mut file = File::open(&path).unwrap_or_else(|_| { + panic!("PgEmbedConfig file {} does not exist.", path.display()) + }); let mut content = String::new(); file.read_to_string(&mut content)?;