From 8963dcc51fc2ac87eaa78fc3d2f7ef3c4fe2250e Mon Sep 17 00:00:00 2001 From: danbugs Date: Wed, 12 Apr 2023 13:58:40 -0700 Subject: [PATCH 1/2] start instrumentation Signed-off-by: danbugs --- Cargo.lock | 112 ++++++++++++++++++++++++++++ Cargo.toml | 4 + crates/keyvalue/Cargo.toml | 2 + crates/keyvalue/src/lib.rs | 11 ++- src/commands/run.rs | 148 +++++++++++++++++++++---------------- src/main.rs | 59 ++++++++++----- 6 files changed, 253 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e44146c2..6efc8fb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2169,6 +2169,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "io-extras" version = "0.17.2" @@ -2746,6 +2752,86 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69d6c3d7288a106c0a363e4b0e8d308058d56902adefb16f4936f417ffef086e" +dependencies = [ + "opentelemetry_api", + "opentelemetry_sdk", +] + +[[package]] +name = "opentelemetry-jaeger" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e785d273968748578931e4dc3b4f5ec86b26e09d9e0d66b55adda7fce742f7a" +dependencies = [ + "async-trait", + "futures", + "futures-executor", + "once_cell", + "opentelemetry", + "opentelemetry-semantic-conventions", + "thiserror", + "thrift", + "tokio", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b02e0230abb0ab6636d18e2ba8fa02903ea63772281340ccac18e0af3ec9eeb" +dependencies = [ + "opentelemetry", +] + +[[package]] +name = "opentelemetry_api" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22" +dependencies = [ + "futures-channel", + "futures-util", + "indexmap", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "once_cell", + "opentelemetry_api", + "percent-encoding", + "rand 0.8.5", + "thiserror", + "tokio", + "tokio-stream", +] + +[[package]] +name = "ordered-float" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7" +dependencies = [ + "num-traits", +] + [[package]] name = "os_str_bytes" version = "6.4.1" @@ -3855,6 +3941,8 @@ dependencies = [ "clap", "flate2", "log", + "opentelemetry", + "opentelemetry-jaeger", "rand 0.8.5", "reqwest", "slight-common", @@ -4012,6 +4100,8 @@ dependencies = [ "azure_storage_blobs", "bytes 1.4.0", "futures", + "opentelemetry", + "opentelemetry-jaeger", "redis", "serde_json", "slight-common", @@ -4387,6 +4477,28 @@ dependencies = [ "once_cell", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + +[[package]] +name = "thrift" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09678c4cdbb4eed72e18b7c2af1329c69825ed16fcbac62d083fc3e2b0590ff0" +dependencies = [ + "byteorder", + "integer-encoding", + "log", + "ordered-float", + "threadpool", +] + [[package]] name = "time" version = "0.1.45" diff --git a/Cargo.toml b/Cargo.toml index 98b3b59d..69137287 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,8 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } reqwest = "0.11" flate2 = "1" tar = "0.4" +opentelemetry = { workspace = true } +opentelemetry-jaeger = { workspace = true } [dev-dependencies] tempdir = { workspace = true } @@ -94,6 +96,8 @@ url = "2.2" semver = "1" clap = { version = "4", features = ["derive"] } rand = "0.8" +opentelemetry = { version = "0.18.0", features = ["rt-tokio"] } +opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio" ] } [workspace] members = [ diff --git a/crates/keyvalue/Cargo.toml b/crates/keyvalue/Cargo.toml index bb471616..97e87b1c 100644 --- a/crates/keyvalue/Cargo.toml +++ b/crates/keyvalue/Cargo.toml @@ -19,6 +19,8 @@ anyhow = { workspace = true } tracing = { workspace = true } tokio = { workspace = true } async-trait = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry-jaeger = { workspace = true } # kv.azblob deps azure_storage_blobs = { version = "0.10", optional = true } azure_storage = { version = "0.10", optional = true } diff --git a/crates/keyvalue/src/lib.rs b/crates/keyvalue/src/lib.rs index 0898e0bd..070d55bc 100644 --- a/crates/keyvalue/src/lib.rs +++ b/crates/keyvalue/src/lib.rs @@ -7,6 +7,7 @@ use anyhow::Result; use async_trait::async_trait; use implementors::*; +use opentelemetry::{global, trace::Tracer}; use slight_common::{impl_resource, BasicState}; /// It is mandatory to `use ::*` due to `impl_resource!`. @@ -147,6 +148,7 @@ impl keyvalue::Keyvalue for Keyvalue { type Keyvalue = KeyvalueInner; async fn keyvalue_open(&mut self, name: &str) -> Result { + let tracer = global::tracer("spiderlightning"); // populate our inner keyvalue object w/ the state received from `slight` // (i.e., what type of keyvalue implementor we are using), and the assigned // name of the object. @@ -164,7 +166,14 @@ impl keyvalue::Keyvalue for Keyvalue { tracing::log::info!("Opening implementor {}", &state.implementor); - let inner = Self::Keyvalue::new(state.implementor.as_str().into(), &state, name).await; + let inner = tracer + .in_span( + format!("opened implementer {}", &state.implementor), + |_cx| async { + Self::Keyvalue::new(state.implementor.as_str().into(), &state, name).await + }, + ) + .await; Ok(inner) } diff --git a/src/commands/run.rs b/src/commands/run.rs index f0996c5d..9729224a 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -5,6 +5,10 @@ use std::{ use anyhow::{bail, Result}; use as_any::Downcast; +use opentelemetry::{ + global, + trace::Tracer, +}; use slight_common::{BasicState, Capability, Ctx as _, WasmtimeBuildable}; use slight_core::slightfile::{Capability as TomlCapability, TomlFile}; #[cfg(feature = "distributed-locking")] @@ -69,63 +73,74 @@ pub struct RunArgs { } pub async fn handle_run(args: RunArgs) -> Result<()> { - let toml_file_contents = - std::fs::read_to_string(args.slightfile.clone()).expect("could not locate slightfile"); - let toml = - toml::from_str::(&toml_file_contents).expect("provided file is not a slightfile"); - - tracing::info!("Starting slight"); - - let mut host_builder = build_store_instance(&toml, &args.slightfile, &args.module).await?; - if let Some(io_redirects) = args.io_redirects.clone() { - tracing::info!("slight io redirects were specified"); - host_builder = host_builder.set_io(io_redirects); - } - let (mut store, instance) = host_builder.build().await; + let tracer = global::tracer("spiderlightning"); + + tracer + .in_span("command handle_run", |_cx| async { + let toml_file_contents = std::fs::read_to_string(args.slightfile.clone()) + .expect("could not locate slightfile"); + let toml = toml::from_str::(&toml_file_contents) + .expect("provided file is not a slightfile"); + + tracing::info!("starting slight"); + + let mut host_builder = tracer + .in_span("handle_run build_store_instance", |_cx| async { + build_store_instance(&toml, &args.slightfile, &args.module).await + }) + .await?; + + if let Some(io_redirects) = args.io_redirects.clone() { + tracing::info!("slight io redirects were specified"); + host_builder = host_builder.set_io(io_redirects); + } + let (mut store, instance) = host_builder.build().await; - let caps = toml.capability.as_ref().unwrap(); - let http_enabled; + let caps = toml.capability.as_ref().unwrap(); + let http_enabled; - if toml.specversion == "0.1" { - http_enabled = caps.iter().any(|cap| cap.name == "http"); - } else if toml.specversion == "0.2" { - http_enabled = caps - .iter() - .any(|cap| cap.resource.as_ref().expect("missing resource field") == "http"); - } else { - bail!("unsupported toml spec version"); - } - // looking for the http capability. - if cfg!(feature = "http-server") && http_enabled { - log::debug!("Http capability enabled"); - update_http_states( - toml, - args.slightfile, - args.module, - &mut store, - args.io_redirects, - ) - .await?; - - // invoke on_server_init - let http_server = - HttpServerInit::new(&mut store, &instance, |ctx| ctx.get_http_server_mut())?; - - let res = http_server.on_server_init(&mut store).await?; - match res { - Ok(_) => {} - Err(e) => bail!(e), - } + if toml.specversion == "0.1" { + http_enabled = caps.iter().any(|cap| cap.name == "http"); + } else if toml.specversion == "0.2" { + http_enabled = caps + .iter() + .any(|cap| cap.resource.as_ref().expect("missing resource field") == "http"); + } else { + bail!("unsupported toml spec version"); + } + // looking for the http capability. + if cfg!(feature = "http-server") && http_enabled { + log::debug!("HTTP capability enabled"); + update_http_states( + toml, + args.slightfile, + args.module, + &mut store, + args.io_redirects, + ) + .await?; + + // invoke on_server_init + let http_server = + HttpServerInit::new(&mut store, &instance, |ctx| ctx.get_http_server_mut())?; + + let res = http_server.on_server_init(&mut store).await?; + match res { + Ok(_) => {} + Err(e) => bail!(e), + } - log::info!("waiting for http to finish..."); - close_http_server(store).await; - } else { - instance - .get_typed_func::<(), _>(&mut store, "_start")? - .call_async(&mut store, ()) - .await?; - } - Ok(()) + log::info!("waiting for http to finish..."); + close_http_server(store).await; + } else { + instance + .get_typed_func::<(), _>(&mut store, "_start")? + .call_async(&mut store, ()) + .await?; + } + Ok(()) + }) + .await } #[cfg(not(feature = "http-server"))] @@ -199,6 +214,7 @@ async fn build_store_instance( toml_file_path: impl AsRef, module: impl AsRef, ) -> Result { + let tracer = global::tracer("spiderlightning"); let mut builder = Builder::from_module(module)?; let mut linked_capabilities: HashSet = HashSet::new(); let mut capability_store: HashMap = HashMap::new(); @@ -230,16 +246,20 @@ async fn build_store_instance( match resource_type { #[cfg(feature = "keyvalue")] _ if KEYVALUE_HOST_IMPLEMENTORS.contains(&resource_type) => { - if !linked_capabilities.contains("keyvalue") { - builder.link_capability::()?; - linked_capabilities.insert("keyvalue".to_string()); - } - - let resource = slight_keyvalue::Keyvalue::new( - resource_type.to_string(), - capability_store.clone(), - ); - builder.add_to_builder("keyvalue".to_string(), resource); + tracer.in_span("build_store_instance link:keyvalue", |_cx| { + if !linked_capabilities.contains("keyvalue") { + builder.link_capability::()?; + linked_capabilities.insert("keyvalue".to_string()); + } + + let resource = slight_keyvalue::Keyvalue::new( + resource_type.to_string(), + capability_store.clone(), + ); + builder.add_to_builder("keyvalue".to_string(), resource); + + Ok::<(), anyhow::Error>(()) + })?; } #[cfg(feature = "distributed-locking")] _ if DISTRIBUTED_LOCKING_HOST_IMPLEMENTORS.contains(&resource_type) => { diff --git a/src/main.rs b/src/main.rs index cf4bd941..0e311a14 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,11 @@ use std::path::PathBuf; use anyhow::Result; use clap::Parser; +use opentelemetry::{ + global::{self, shutdown_tracer_provider}, runtime, + sdk::trace as sdktrace, + trace::{TraceError, Tracer}, +}; use slight_lib::{ cli::{Args, Commands}, commands::{ @@ -13,31 +18,49 @@ use slight_lib::{ }, }; +fn init_tracer() -> Result { + opentelemetry_jaeger::new_agent_pipeline() + .with_service_name("spiderlightning") + .install_batch(runtime::Tokio) +} + /// The entry point for slight CLI #[tokio::main] async fn main() -> Result<()> { + let _tracer = init_tracer()?; + let tracer = global::tracer("spiderlightning"); + tracing_subscriber::fmt() .with_writer(std::io::stderr) .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .init(); let args = Args::parse(); - match &args.command { - Commands::Run { module } => { - let run_args = RunArgs { - module: PathBuf::from(&module.path), - slightfile: PathBuf::from(args.config.unwrap()), - ..Default::default() - }; - handle_run(run_args).await - } - Commands::Secret { key, value } => handle_secret(key, value, args.config.unwrap()), - Commands::Add { - interface_at_release, - } => handle_add(interface_at_release.to_owned(), None).await, - Commands::New { - command, - name_at_release, - } => handle_new(name_at_release, command).await, - } + tracer + .in_span("slight command", |_cx| async { + match &args.command { + Commands::Run { module } => { + let run_args = RunArgs { + module: PathBuf::from(&module.path), + slightfile: PathBuf::from(args.config.unwrap()), + ..Default::default() + }; + + handle_run(run_args).await + } + Commands::Secret { key, value } => handle_secret(key, value, args.config.unwrap()), + Commands::Add { + interface_at_release, + } => handle_add(interface_at_release.to_owned(), None).await, + Commands::New { + command, + name_at_release, + } => handle_new(name_at_release, command).await, + } + }) + .await?; + + shutdown_tracer_provider(); + + Ok(()) } From e30602f2a10bbdb5152dcb5894b6c5bb2dac6492 Mon Sep 17 00:00:00 2001 From: danbugs Date: Wed, 12 Apr 2023 14:51:08 -0700 Subject: [PATCH 2/2] fix merge Signed-off-by: danbugs --- Makefile | 5 +++++ crates/keyvalue/src/lib.rs | 2 +- src/commands/run.rs | 14 -------------- 3 files changed, 6 insertions(+), 15 deletions(-) diff --git a/Makefile b/Makefile index 95ad51df..e511d08e 100644 --- a/Makefile +++ b/Makefile @@ -134,6 +134,11 @@ run-rust: RUST_LOG=$(LOG_LEVEL) $(SLIGHT) -c './examples/messaging-consumer-demo/nats_slightfile.toml' run ./examples/messaging-consumer-demo/target/wasm32-wasi/release/messaging-consumer-demo.wasm & RUST_LOG=$(LOG_LEVEL) $(SLIGHT) -c './examples/messaging-producer-demo/nats_slightfile.toml' run ./examples/messaging-producer-demo/target/wasm32-wasi/release/messaging-producer-demo.wasm +.PHONY: quick-telemetry-testrun +quick-telemetry-testrun: + # keyvalue.filesystem + RUST_LOG=$(LOG_LEVEL) $(SLIGHT) -c './examples/keyvalue-demo/keyvalue_filesystem_slightfile.toml' run ./examples/keyvalue-demo/target/wasm32-wasi/release/keyvalue-demo.wasm + .PHONY: clean-rust clean-rust: cargo clean --manifest-path ./examples/configs-demo/Cargo.toml & \ diff --git a/crates/keyvalue/src/lib.rs b/crates/keyvalue/src/lib.rs index e971b7c0..1eb750b3 100644 --- a/crates/keyvalue/src/lib.rs +++ b/crates/keyvalue/src/lib.rs @@ -171,7 +171,7 @@ impl keyvalue::Keyvalue for Keyvalue { .in_span( format!("opened implementer {}", &state.implementor), |_cx| async { - Self::Keyvalue::new(state.implementor.as_str().into(), &state, name).await + Self::Keyvalue::new(state.implementor.clone().into(), &state, name).await }, ) .await; diff --git a/src/commands/run.rs b/src/commands/run.rs index e176e126..baae22b3 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -51,8 +51,6 @@ pub async fn handle_run(args: RunArgs) -> Result<()> { let http_enabled = has_http_cap(&toml); tracing::info!("Starting slight"); - tracing::info!("starting slight"); - let mut host_builder = tracer .in_span("handle_run build_store_instance", |_cx| async { build_store_instance(&toml, &args.slightfile, &args.module).await @@ -65,18 +63,6 @@ pub async fn handle_run(args: RunArgs) -> Result<()> { } let (mut store, instance) = host_builder.build().await; - let caps = toml.capability.as_ref().unwrap(); - let http_enabled; - - if toml.specversion == "0.1" { - http_enabled = caps.iter().any(|cap| cap.name == "http"); - } else if toml.specversion == "0.2" { - http_enabled = caps - .iter() - .any(|cap| cap.resource.as_ref().expect("missing resource field") == "http"); - } else { - bail!("unsupported toml spec version"); - } // looking for the http capability. if cfg!(feature = "http-server") && http_enabled { log::debug!("HTTP capability enabled");