From 365d2b12c2645115ae703f6806cc6258599ffaf9 Mon Sep 17 00:00:00 2001 From: William Lyles <26171886+wilyle@users.noreply.github.com> Date: Wed, 22 Nov 2023 14:21:20 -0800 Subject: [PATCH 1/8] save work --- .cargo/config.toml | 2 + Cargo.toml | 4 +- core/common/src/grpc_interceptor.rs | 9 +++- core/common/src/utils.rs | 67 ++++++++++++++++++++++-- core/invehicle-digital-twin/Cargo.toml | 4 +- core/invehicle-digital-twin/src/main.rs | 36 ++++++++++++- core/module/managed_subscribe/Cargo.toml | 2 +- core/protobuf_data_access/Cargo.toml | 2 +- samples/command/Cargo.toml | 2 +- samples/managed_subscribe/Cargo.toml | 2 +- samples/mixed/Cargo.toml | 2 +- samples/property/Cargo.toml | 2 +- samples/protobuf_data_access/Cargo.toml | 2 +- samples/seat_massager/Cargo.toml | 2 +- samples/streaming/Cargo.toml | 2 +- samples/tutorial/Cargo.toml | 2 +- 16 files changed, 123 insertions(+), 19 deletions(-) create mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 00000000..16f1e73a --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 85bd1bba..cf9d2923 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,7 +63,7 @@ serde_derive = "1.0.163" serde_json = "^1.0" strum = "0.25" strum_macros = "0.25.1" -tokio = "1.29.1" +tokio = { version = "1.29.1", features = ["macros", "rt-multi-thread", "tracing", "signal"] } tokio-stream = "0.1.14" tonic = "0.10.0" tonic-build = "0.10.0" @@ -72,3 +72,5 @@ tower-http = "0.4.3" url = "2.3.1" uuid = "1.2.2" yaml-rust = "0.4" +# for tokio console +console-subscriber = "0.2.0" diff --git a/core/common/src/grpc_interceptor.rs b/core/common/src/grpc_interceptor.rs index ecf5906d..ed3a22fd 100644 --- a/core/common/src/grpc_interceptor.rs +++ b/core/common/src/grpc_interceptor.rs @@ -10,10 +10,12 @@ use http::uri::Uri; use http_body::Body; use log::warn; use regex::Regex; -use std::error::Error; +use std::{error::Error, time::Duration}; use std::pin::Pin; use tower::{Layer, Service}; +use crate::utils; + // This module provides the gRPC Interceptor construct. It can be used to // intercept gRPC calls, and examine/modify their requests and responses. @@ -157,9 +159,10 @@ where let is_applicable = interceptor.is_applicable(&service_name, &method_name); if is_applicable && interceptor.must_handle_request() { + log::warn!("Request intercepted"); let (parts, body) = request.into_parts(); let mut body_bytes: Bytes = - match futures::executor::block_on(hyper::body::to_bytes(body)) { + match utils::block_on(hyper::body::to_bytes(body), Duration::from_secs(10)) { Ok(bytes) => bytes, Err(err) => { return Box::pin(async move { @@ -167,6 +170,7 @@ where }) } }; + log::warn!("Body parsed"); let protobuf_message_bytes: Bytes = body_bytes.split_off(GRPC_HEADER_LENGTH); let grpc_header_bytes = body_bytes; let new_protobuf_message_bytes: Bytes = match interceptor.handle_request( @@ -182,6 +186,7 @@ where let stream = futures_util::stream::iter(new_body_chunks); let new_body = tonic::transport::Body::wrap_stream(stream); request = http::request::Request::from_parts(parts, new_body); + log::warn!("Finished handling request"); } let fut = self.service.call(request); diff --git a/core/common/src/utils.rs b/core/common/src/utils.rs index ab732d06..5be73239 100644 --- a/core/common/src/utils.rs +++ b/core/common/src/utils.rs @@ -8,12 +8,14 @@ use config::{Config, ConfigError, File, FileFormat}; use core_protobuf_data_access::chariott::service_discovery::core::v1::{ service_registry_client::ServiceRegistryClient, DiscoverRequest, }; -use log::{debug, info}; +use log::{debug, info, error}; use serde_derive::Deserialize; -use std::env; +use tokio::time::timeout; +use std::any::Any; +use std::{env, thread, panic}; use std::future::Future; use strum_macros::Display; -use tokio::time::{sleep, Duration}; +use tokio::{time::{sleep, Duration}, runtime::Handle}; use tonic::{Request, Status}; const IBEJI_HOME_VAR_NAME: &str = "IBEJI_HOME"; @@ -218,6 +220,65 @@ pub fn get_uri(uri: &str) -> Result { Ok(uri.to_string()) } +/// Blocks on an asynchronous task, waiting for it to complete. +/// This can be used to execute async code in a sync context. +/// This will start a new thread using the current tokio runtime, +/// and will panic when called outside of a tokio runtime context. +/// This method will also panic if joining with the spawned thread panics, +/// including if the passed-in future panics. +/// +/// For more information on acquiring a runtime handle and the panic conditions, +/// see https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.current +/// +/// # Arguments +/// - `future`: the future to execute +/// - `timeout`: the maximum amount of time that `future` should execute before being cancelled +pub fn block_on(future: F, timeout: Duration) -> Result> +where + F: Future> + Send + 'static, + T: Send + 'static, + E: Send + 'static, +{ + let handle = Handle::current(); + let thread = thread::spawn(move || { + handle.block_on(async move { + tokio::time::timeout(timeout, future).await + }) + }); + + match thread.join() { + Ok(Ok(r)) => r.map_err(|e| SomeError::InnerError(e)), + Ok(Err(_)) => Err(SomeError::TimeoutError), + Err(e) => Err(SomeError::JoinError(format!("{e:?}"))), + } +} + +#[derive(Debug)] +pub enum SomeError { + TimeoutError, + JoinError(String), + InnerError(E), +} + +impl std::error::Error for SomeError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + SomeError::InnerError(e) => Some(e), + _ => None, + } + } +} + +impl std::fmt::Display for SomeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SomeError::TimeoutError => write!(f, "Execution timed out"), + SomeError::JoinError(s) => write!(f, "Joining thread failed with {s}"), + SomeError::InnerError(e) => write!(f, "Error during execution: {e}"), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/invehicle-digital-twin/Cargo.toml b/core/invehicle-digital-twin/Cargo.toml index f929580f..be71804d 100644 --- a/core/invehicle-digital-twin/Cargo.toml +++ b/core/invehicle-digital-twin/Cargo.toml @@ -27,11 +27,13 @@ serde_derive = { workspace = true } serde_json = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } -tokio = { workspace = true , features = ["macros", "rt-multi-thread"] } +tokio = { workspace = true } tonic = { workspace = true } tower = { workspace = true } url = { workspace = true } yaml-rust = { workspace = true } +# for tokio console +console-subscriber = { workspace = true } [build-dependencies] tonic-build = { workspace = true } diff --git a/core/invehicle-digital-twin/src/main.rs b/core/invehicle-digital-twin/src/main.rs index be4c1aff..321f6578 100644 --- a/core/invehicle-digital-twin/src/main.rs +++ b/core/invehicle-digital-twin/src/main.rs @@ -26,7 +26,9 @@ use parking_lot::RwLock; use std::boxed::Box; use std::collections::HashMap; use std::convert::Infallible; +use std::env; use std::net::SocketAddr; +use std::str::FromStr; use std::sync::Arc; use tonic::body::BoxBody; use tonic::transport::{Body, NamedService}; @@ -37,6 +39,7 @@ use tower::Service; mod invehicle_digital_twin_config; mod invehicle_digital_twin_impl; +const DEFAULT_LOG_LEVEL: &str = "info"; const INVEHICLE_DIGITAL_TWIN_SERVICE_NAMESPACE: &str = "sdv.ibeji"; const INVEHICLE_DIGITAL_TWIN_SERVICE_NAME: &str = "invehicle_digital_twin"; const INVEHICLE_DIGITAL_TWIN_SERVICE_VERSION: &str = "1.0"; @@ -136,8 +139,37 @@ where #[tokio::main] async fn main() -> Result<(), Box> { - // Setup logging. - Builder::new().filter(None, LevelFilter::Info).target(Target::Stdout).init(); + let args: HashMap> = env::args() + .skip(1) + .map(|arg| { + let mut split = arg.split('='); + let key = split + .next() + .expect("Couldn't parse argument key") + .to_owned(); + let val = split.next().map(|v| v.to_owned()); + + if split.next().is_some() { + panic!("Too many pieces in argument"); + } + + (key, val) + }) + .collect(); + + println!("args: {args:?}"); + + // Setup logging + let log_level_arg = args.get("--log-level") + .cloned() + .unwrap_or(Some(DEFAULT_LOG_LEVEL.to_owned())) + .expect("No log-level value provided"); + let log_level = LevelFilter::from_str(log_level_arg.as_str()) + .expect("Could not parse log level"); + Builder::new().filter(None, log_level).target(Target::Stdout).init(); + + // enables tokio tracing + //console_subscriber::init(); info!("The In-Vehicle Digital Twin Service has started."); diff --git a/core/module/managed_subscribe/Cargo.toml b/core/module/managed_subscribe/Cargo.toml index 81b63cc2..bac7d4da 100644 --- a/core/module/managed_subscribe/Cargo.toml +++ b/core/module/managed_subscribe/Cargo.toml @@ -20,7 +20,7 @@ serde = { workspace = true, features = ["derive"] } serde_derive = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } -tokio = { workspace = true , features = ["macros", "rt-multi-thread"] } +tokio = { workspace = true } tonic = { workspace = true } tower = { workspace = true } yaml-rust = { workspace = true } diff --git a/core/protobuf_data_access/Cargo.toml b/core/protobuf_data_access/Cargo.toml index f97cef5f..7beef1fa 100644 --- a/core/protobuf_data_access/Cargo.toml +++ b/core/protobuf_data_access/Cargo.toml @@ -12,7 +12,7 @@ license = "MIT" prost = { workspace = true } prost-types = { workspace = true } serde = { workspace = true, features = ["derive"] } -tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio = { workspace = true } tonic = { workspace = true } [build-dependencies] diff --git a/samples/command/Cargo.toml b/samples/command/Cargo.toml index af54909d..5286075b 100644 --- a/samples/command/Cargo.toml +++ b/samples/command/Cargo.toml @@ -20,7 +20,7 @@ samples-protobuf-data-access = { path = "../protobuf_data_access" } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] } diff --git a/samples/managed_subscribe/Cargo.toml b/samples/managed_subscribe/Cargo.toml index f82adb36..5f95a27e 100644 --- a/samples/managed_subscribe/Cargo.toml +++ b/samples/managed_subscribe/Cargo.toml @@ -23,7 +23,7 @@ serde_derive = { workspace = true } serde_json = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } +tokio = { workspace = true } tonic = { workspace = true } url = { workspace = true } uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] } diff --git a/samples/mixed/Cargo.toml b/samples/mixed/Cargo.toml index 4ba43ce3..4928e24e 100644 --- a/samples/mixed/Cargo.toml +++ b/samples/mixed/Cargo.toml @@ -20,7 +20,7 @@ samples-protobuf-data-access = { path = "../protobuf_data_access" } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] } diff --git a/samples/property/Cargo.toml b/samples/property/Cargo.toml index e24508d1..6bb2d2cd 100644 --- a/samples/property/Cargo.toml +++ b/samples/property/Cargo.toml @@ -21,7 +21,7 @@ samples-protobuf-data-access = { path = "../protobuf_data_access" } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } +tokio = { workspace = true } tonic = { workspace = true } url = { workspace = true } uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] } diff --git a/samples/protobuf_data_access/Cargo.toml b/samples/protobuf_data_access/Cargo.toml index 89e9b75a..c074b4fd 100644 --- a/samples/protobuf_data_access/Cargo.toml +++ b/samples/protobuf_data_access/Cargo.toml @@ -13,7 +13,7 @@ tonic = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } serde = { workspace = true, features = ["derive"] } -tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio = { workspace = true } [build-dependencies] tonic-build = { workspace = true } diff --git a/samples/seat_massager/Cargo.toml b/samples/seat_massager/Cargo.toml index 669160c2..f0c18e68 100644 --- a/samples/seat_massager/Cargo.toml +++ b/samples/seat_massager/Cargo.toml @@ -19,7 +19,7 @@ samples-protobuf-data-access = { path = "../protobuf_data_access" } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } diff --git a/samples/streaming/Cargo.toml b/samples/streaming/Cargo.toml index 1a1b8b41..68682452 100644 --- a/samples/streaming/Cargo.toml +++ b/samples/streaming/Cargo.toml @@ -24,7 +24,7 @@ sdl2 = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_derive = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal", "sync"] } +tokio = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } url = { workspace = true } diff --git a/samples/tutorial/Cargo.toml b/samples/tutorial/Cargo.toml index 88fe34c7..63fabea3 100644 --- a/samples/tutorial/Cargo.toml +++ b/samples/tutorial/Cargo.toml @@ -17,7 +17,7 @@ samples-protobuf-data-access = { path = "../protobuf_data_access" } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } +tokio = { workspace = true } tonic = { workspace = true } url = { workspace = true } From 9be2e5e754c68d06d357201404fa4f156a7047a3 Mon Sep 17 00:00:00 2001 From: William Lyles <26171886+wilyle@users.noreply.github.com> Date: Wed, 22 Nov 2023 14:36:13 -0800 Subject: [PATCH 2/8] re-enable tokio console support --- core/invehicle-digital-twin/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/invehicle-digital-twin/src/main.rs b/core/invehicle-digital-twin/src/main.rs index 321f6578..170b4ce7 100644 --- a/core/invehicle-digital-twin/src/main.rs +++ b/core/invehicle-digital-twin/src/main.rs @@ -169,7 +169,7 @@ async fn main() -> Result<(), Box> { Builder::new().filter(None, log_level).target(Target::Stdout).init(); // enables tokio tracing - //console_subscriber::init(); + console_subscriber::init(); info!("The In-Vehicle Digital Twin Service has started."); From cebf7c99c060a0a09baeeebfd95e254c9450d196 Mon Sep 17 00:00:00 2001 From: William Lyles <26171886+wilyle@users.noreply.github.com> Date: Mon, 27 Nov 2023 13:39:42 -0800 Subject: [PATCH 3/8] put tokio console support behind feature --- .cargo/config.toml | 2 -- Cargo.toml | 5 ++--- README.md | 11 +++++++++++ core/invehicle-digital-twin/Cargo.toml | 6 +++--- core/invehicle-digital-twin/src/main.rs | 8 ++++++-- samples/command/Cargo.toml | 2 +- samples/managed_subscribe/Cargo.toml | 2 +- samples/mixed/Cargo.toml | 2 +- samples/property/Cargo.toml | 2 +- samples/protobuf_data_access/Cargo.toml | 2 +- samples/seat_massager/Cargo.toml | 2 +- samples/streaming/Cargo.toml | 2 +- samples/tutorial/Cargo.toml | 2 +- 13 files changed, 30 insertions(+), 18 deletions(-) delete mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml deleted file mode 100644 index 16f1e73a..00000000 --- a/.cargo/config.toml +++ /dev/null @@ -1,2 +0,0 @@ -[build] -rustflags = ["--cfg", "tokio_unstable"] \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index cf9d2923..ebaa5b25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,7 +63,8 @@ serde_derive = "1.0.163" serde_json = "^1.0" strum = "0.25" strum_macros = "0.25.1" -tokio = { version = "1.29.1", features = ["macros", "rt-multi-thread", "tracing", "signal"] } +tokio = "1.29.1" +tokio-console-subscriber = { version = "0.2.0", package = "console-subscriber" } tokio-stream = "0.1.14" tonic = "0.10.0" tonic-build = "0.10.0" @@ -72,5 +73,3 @@ tower-http = "0.4.3" url = "2.3.1" uuid = "1.2.2" yaml-rust = "0.4" -# for tokio console -console-subscriber = "0.2.0" diff --git a/README.md b/README.md index 14993689..10c9f050 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ - [Install MQTT Broker](#install-mqtt-broker) - [Cloning the Repo](#cloning-the-repo) - [Building](#building) + - [Tokio Console Support](#tokio-console-support) - [Running the Tests](#running-the-tests) - [Running the Samples](#running-the-samples) - [Property Sample](#property-sample) @@ -107,6 +108,16 @@ Once you have installed the prerequisites, go to your enlistment's root director This should build all of the libraries and executables. +### Tokio Console Support + +Ibeji has support for using the [tokio console](https://github.com/tokio-rs/console) for advanced debugging. To enable this support, you need to build with the `tokio_console` feature enabled and with the `tokio_unstable` config flag for the rust compiler: + +```bash +RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio_console +``` + +Note that the tokio console will intercept trace-level logs, so these will not be visible when debugging wih the tokio console. + ## Running the Tests After successfully building Ibeji, you can run all of the unit tests. To do this go to the enlistment's root directory and run: diff --git a/core/invehicle-digital-twin/Cargo.toml b/core/invehicle-digital-twin/Cargo.toml index be71804d..79509142 100644 --- a/core/invehicle-digital-twin/Cargo.toml +++ b/core/invehicle-digital-twin/Cargo.toml @@ -27,13 +27,12 @@ serde_derive = { workspace = true } serde_json = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio-console-subscriber = { workspace = true, optional = true } tonic = { workspace = true } tower = { workspace = true } url = { workspace = true } yaml-rust = { workspace = true } -# for tokio console -console-subscriber = { workspace = true } [build-dependencies] tonic-build = { workspace = true } @@ -41,3 +40,4 @@ tonic-build = { workspace = true } [features] managed_subscribe = ["dep:managed_subscribe"] containerize = ["common/containerize"] +tokio_console = ["dep:tokio-console-subscriber", "tokio/tracing"] \ No newline at end of file diff --git a/core/invehicle-digital-twin/src/main.rs b/core/invehicle-digital-twin/src/main.rs index 170b4ce7..7083cb9a 100644 --- a/core/invehicle-digital-twin/src/main.rs +++ b/core/invehicle-digital-twin/src/main.rs @@ -168,8 +168,12 @@ async fn main() -> Result<(), Box> { .expect("Could not parse log level"); Builder::new().filter(None, log_level).target(Target::Stdout).init(); - // enables tokio tracing - console_subscriber::init(); + #[cfg(feature = "tokio_console")] + { + // enable tracing for tokio console + info!("Tokio console tracing is enabled"); + tokio_console_subscriber::init(); + } info!("The In-Vehicle Digital Twin Service has started."); diff --git a/samples/command/Cargo.toml b/samples/command/Cargo.toml index 5286075b..af54909d 100644 --- a/samples/command/Cargo.toml +++ b/samples/command/Cargo.toml @@ -20,7 +20,7 @@ samples-protobuf-data-access = { path = "../protobuf_data_access" } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tokio-stream = { workspace = true } tonic = { workspace = true } uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] } diff --git a/samples/managed_subscribe/Cargo.toml b/samples/managed_subscribe/Cargo.toml index 5f95a27e..f82adb36 100644 --- a/samples/managed_subscribe/Cargo.toml +++ b/samples/managed_subscribe/Cargo.toml @@ -23,7 +23,7 @@ serde_derive = { workspace = true } serde_json = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } tonic = { workspace = true } url = { workspace = true } uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] } diff --git a/samples/mixed/Cargo.toml b/samples/mixed/Cargo.toml index 4928e24e..4ba43ce3 100644 --- a/samples/mixed/Cargo.toml +++ b/samples/mixed/Cargo.toml @@ -20,7 +20,7 @@ samples-protobuf-data-access = { path = "../protobuf_data_access" } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tokio-stream = { workspace = true } tonic = { workspace = true } uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] } diff --git a/samples/property/Cargo.toml b/samples/property/Cargo.toml index 6bb2d2cd..e24508d1 100644 --- a/samples/property/Cargo.toml +++ b/samples/property/Cargo.toml @@ -21,7 +21,7 @@ samples-protobuf-data-access = { path = "../protobuf_data_access" } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } tonic = { workspace = true } url = { workspace = true } uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] } diff --git a/samples/protobuf_data_access/Cargo.toml b/samples/protobuf_data_access/Cargo.toml index c074b4fd..89e9b75a 100644 --- a/samples/protobuf_data_access/Cargo.toml +++ b/samples/protobuf_data_access/Cargo.toml @@ -13,7 +13,7 @@ tonic = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } serde = { workspace = true, features = ["derive"] } -tokio = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } [build-dependencies] tonic-build = { workspace = true } diff --git a/samples/seat_massager/Cargo.toml b/samples/seat_massager/Cargo.toml index f0c18e68..669160c2 100644 --- a/samples/seat_massager/Cargo.toml +++ b/samples/seat_massager/Cargo.toml @@ -19,7 +19,7 @@ samples-protobuf-data-access = { path = "../protobuf_data_access" } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tokio-stream = { workspace = true } tonic = { workspace = true } diff --git a/samples/streaming/Cargo.toml b/samples/streaming/Cargo.toml index 68682452..e1ffacd1 100644 --- a/samples/streaming/Cargo.toml +++ b/samples/streaming/Cargo.toml @@ -24,7 +24,7 @@ sdl2 = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_derive = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tokio-stream = { workspace = true } tonic = { workspace = true } url = { workspace = true } diff --git a/samples/tutorial/Cargo.toml b/samples/tutorial/Cargo.toml index 63fabea3..265285f2 100644 --- a/samples/tutorial/Cargo.toml +++ b/samples/tutorial/Cargo.toml @@ -17,7 +17,7 @@ samples-protobuf-data-access = { path = "../protobuf_data_access" } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tonic = { workspace = true } url = { workspace = true } From 4b3939570fd7ed7ebc89da071a8935104246b40c Mon Sep 17 00:00:00 2001 From: William Lyles <26171886+wilyle@users.noreply.github.com> Date: Mon, 27 Nov 2023 13:45:08 -0800 Subject: [PATCH 4/8] revert stuff back to main --- core/common/src/grpc_interceptor.rs | 9 +--- core/common/src/utils.rs | 67 ++---------------------- core/invehicle-digital-twin/src/main.rs | 2 +- core/module/managed_subscribe/Cargo.toml | 2 +- core/protobuf_data_access/Cargo.toml | 2 +- samples/streaming/Cargo.toml | 2 +- samples/tutorial/Cargo.toml | 2 +- 7 files changed, 10 insertions(+), 76 deletions(-) diff --git a/core/common/src/grpc_interceptor.rs b/core/common/src/grpc_interceptor.rs index ed3a22fd..ecf5906d 100644 --- a/core/common/src/grpc_interceptor.rs +++ b/core/common/src/grpc_interceptor.rs @@ -10,12 +10,10 @@ use http::uri::Uri; use http_body::Body; use log::warn; use regex::Regex; -use std::{error::Error, time::Duration}; +use std::error::Error; use std::pin::Pin; use tower::{Layer, Service}; -use crate::utils; - // This module provides the gRPC Interceptor construct. It can be used to // intercept gRPC calls, and examine/modify their requests and responses. @@ -159,10 +157,9 @@ where let is_applicable = interceptor.is_applicable(&service_name, &method_name); if is_applicable && interceptor.must_handle_request() { - log::warn!("Request intercepted"); let (parts, body) = request.into_parts(); let mut body_bytes: Bytes = - match utils::block_on(hyper::body::to_bytes(body), Duration::from_secs(10)) { + match futures::executor::block_on(hyper::body::to_bytes(body)) { Ok(bytes) => bytes, Err(err) => { return Box::pin(async move { @@ -170,7 +167,6 @@ where }) } }; - log::warn!("Body parsed"); let protobuf_message_bytes: Bytes = body_bytes.split_off(GRPC_HEADER_LENGTH); let grpc_header_bytes = body_bytes; let new_protobuf_message_bytes: Bytes = match interceptor.handle_request( @@ -186,7 +182,6 @@ where let stream = futures_util::stream::iter(new_body_chunks); let new_body = tonic::transport::Body::wrap_stream(stream); request = http::request::Request::from_parts(parts, new_body); - log::warn!("Finished handling request"); } let fut = self.service.call(request); diff --git a/core/common/src/utils.rs b/core/common/src/utils.rs index 5be73239..ab732d06 100644 --- a/core/common/src/utils.rs +++ b/core/common/src/utils.rs @@ -8,14 +8,12 @@ use config::{Config, ConfigError, File, FileFormat}; use core_protobuf_data_access::chariott::service_discovery::core::v1::{ service_registry_client::ServiceRegistryClient, DiscoverRequest, }; -use log::{debug, info, error}; +use log::{debug, info}; use serde_derive::Deserialize; -use tokio::time::timeout; -use std::any::Any; -use std::{env, thread, panic}; +use std::env; use std::future::Future; use strum_macros::Display; -use tokio::{time::{sleep, Duration}, runtime::Handle}; +use tokio::time::{sleep, Duration}; use tonic::{Request, Status}; const IBEJI_HOME_VAR_NAME: &str = "IBEJI_HOME"; @@ -220,65 +218,6 @@ pub fn get_uri(uri: &str) -> Result { Ok(uri.to_string()) } -/// Blocks on an asynchronous task, waiting for it to complete. -/// This can be used to execute async code in a sync context. -/// This will start a new thread using the current tokio runtime, -/// and will panic when called outside of a tokio runtime context. -/// This method will also panic if joining with the spawned thread panics, -/// including if the passed-in future panics. -/// -/// For more information on acquiring a runtime handle and the panic conditions, -/// see https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.current -/// -/// # Arguments -/// - `future`: the future to execute -/// - `timeout`: the maximum amount of time that `future` should execute before being cancelled -pub fn block_on(future: F, timeout: Duration) -> Result> -where - F: Future> + Send + 'static, - T: Send + 'static, - E: Send + 'static, -{ - let handle = Handle::current(); - let thread = thread::spawn(move || { - handle.block_on(async move { - tokio::time::timeout(timeout, future).await - }) - }); - - match thread.join() { - Ok(Ok(r)) => r.map_err(|e| SomeError::InnerError(e)), - Ok(Err(_)) => Err(SomeError::TimeoutError), - Err(e) => Err(SomeError::JoinError(format!("{e:?}"))), - } -} - -#[derive(Debug)] -pub enum SomeError { - TimeoutError, - JoinError(String), - InnerError(E), -} - -impl std::error::Error for SomeError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - SomeError::InnerError(e) => Some(e), - _ => None, - } - } -} - -impl std::fmt::Display for SomeError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - SomeError::TimeoutError => write!(f, "Execution timed out"), - SomeError::JoinError(s) => write!(f, "Joining thread failed with {s}"), - SomeError::InnerError(e) => write!(f, "Error during execution: {e}"), - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/core/invehicle-digital-twin/src/main.rs b/core/invehicle-digital-twin/src/main.rs index 7083cb9a..ff23e8e8 100644 --- a/core/invehicle-digital-twin/src/main.rs +++ b/core/invehicle-digital-twin/src/main.rs @@ -157,7 +157,7 @@ async fn main() -> Result<(), Box> { }) .collect(); - println!("args: {args:?}"); + debug!("args: {args:?}"); // Setup logging let log_level_arg = args.get("--log-level") diff --git a/core/module/managed_subscribe/Cargo.toml b/core/module/managed_subscribe/Cargo.toml index bac7d4da..49cfde59 100644 --- a/core/module/managed_subscribe/Cargo.toml +++ b/core/module/managed_subscribe/Cargo.toml @@ -20,7 +20,7 @@ serde = { workspace = true, features = ["derive"] } serde_derive = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tonic = { workspace = true } tower = { workspace = true } yaml-rust = { workspace = true } diff --git a/core/protobuf_data_access/Cargo.toml b/core/protobuf_data_access/Cargo.toml index 7beef1fa..f97cef5f 100644 --- a/core/protobuf_data_access/Cargo.toml +++ b/core/protobuf_data_access/Cargo.toml @@ -12,7 +12,7 @@ license = "MIT" prost = { workspace = true } prost-types = { workspace = true } serde = { workspace = true, features = ["derive"] } -tokio = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tonic = { workspace = true } [build-dependencies] diff --git a/samples/streaming/Cargo.toml b/samples/streaming/Cargo.toml index e1ffacd1..1a1b8b41 100644 --- a/samples/streaming/Cargo.toml +++ b/samples/streaming/Cargo.toml @@ -24,7 +24,7 @@ sdl2 = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_derive = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal", "sync"] } tokio-stream = { workspace = true } tonic = { workspace = true } url = { workspace = true } diff --git a/samples/tutorial/Cargo.toml b/samples/tutorial/Cargo.toml index 265285f2..88fe34c7 100644 --- a/samples/tutorial/Cargo.toml +++ b/samples/tutorial/Cargo.toml @@ -17,7 +17,7 @@ samples-protobuf-data-access = { path = "../protobuf_data_access" } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } tonic = { workspace = true } url = { workspace = true } From a4b2b9506d1756efe8c93de3f1fd9ce8b9f7a746 Mon Sep 17 00:00:00 2001 From: William Lyles <26171886+wilyle@users.noreply.github.com> Date: Mon, 27 Nov 2023 13:49:34 -0800 Subject: [PATCH 5/8] one more revert --- core/module/managed_subscribe/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/module/managed_subscribe/Cargo.toml b/core/module/managed_subscribe/Cargo.toml index 49cfde59..81b63cc2 100644 --- a/core/module/managed_subscribe/Cargo.toml +++ b/core/module/managed_subscribe/Cargo.toml @@ -20,7 +20,7 @@ serde = { workspace = true, features = ["derive"] } serde_derive = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio = { workspace = true , features = ["macros", "rt-multi-thread"] } tonic = { workspace = true } tower = { workspace = true } yaml-rust = { workspace = true } From d0b785c0b5e5e187b2c534b24dcf18713bfb1da9 Mon Sep 17 00:00:00 2001 From: William Lyles <26171886+wilyle@users.noreply.github.com> Date: Mon, 27 Nov 2023 13:55:47 -0800 Subject: [PATCH 6/8] fmt and fix spelling --- .accepted_words.txt | 4 ++++ README.md | 2 +- core/invehicle-digital-twin/src/main.rs | 12 +++++------- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/.accepted_words.txt b/.accepted_words.txt index 2ec8814c..3652c1ab 100644 --- a/.accepted_words.txt +++ b/.accepted_words.txt @@ -8,6 +8,7 @@ CHARIOTT Chariott Chariott's chariott +cfg com config containerized @@ -67,6 +68,7 @@ ps repo Repo rm +RUSTFLAGS rustup sdk sdl @@ -76,6 +78,8 @@ snapd standalone sudo timothee +tokio +Tokio toml Tonic toolchain diff --git a/README.md b/README.md index 10c9f050..49fff3ec 100644 --- a/README.md +++ b/README.md @@ -116,7 +116,7 @@ Ibeji has support for using the [tokio console](https://github.com/tokio-rs/cons RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio_console ``` -Note that the tokio console will intercept trace-level logs, so these will not be visible when debugging wih the tokio console. +Note that the tokio console will intercept trace-level logs, so these will not be visible when debugging with the tokio console. ## Running the Tests diff --git a/core/invehicle-digital-twin/src/main.rs b/core/invehicle-digital-twin/src/main.rs index ff23e8e8..1949c858 100644 --- a/core/invehicle-digital-twin/src/main.rs +++ b/core/invehicle-digital-twin/src/main.rs @@ -143,10 +143,7 @@ async fn main() -> Result<(), Box> { .skip(1) .map(|arg| { let mut split = arg.split('='); - let key = split - .next() - .expect("Couldn't parse argument key") - .to_owned(); + let key = split.next().expect("Couldn't parse argument key").to_owned(); let val = split.next().map(|v| v.to_owned()); if split.next().is_some() { @@ -160,12 +157,13 @@ async fn main() -> Result<(), Box> { debug!("args: {args:?}"); // Setup logging - let log_level_arg = args.get("--log-level") + let log_level_arg = args + .get("--log-level") .cloned() .unwrap_or(Some(DEFAULT_LOG_LEVEL.to_owned())) .expect("No log-level value provided"); - let log_level = LevelFilter::from_str(log_level_arg.as_str()) - .expect("Could not parse log level"); + let log_level = + LevelFilter::from_str(log_level_arg.as_str()).expect("Could not parse log level"); Builder::new().filter(None, log_level).target(Target::Stdout).init(); #[cfg(feature = "tokio_console")] From 5bc82687847909c2e2afae4b6b4846bed671b323 Mon Sep 17 00:00:00 2001 From: William Lyles <26171886+wilyle@users.noreply.github.com> Date: Mon, 27 Nov 2023 14:02:36 -0800 Subject: [PATCH 7/8] fix rustdoc warnings --- core/common/src/grpc_interceptor.rs | 4 ++-- samples/common/src/constants.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/common/src/grpc_interceptor.rs b/core/common/src/grpc_interceptor.rs index ecf5906d..f8f2d72d 100644 --- a/core/common/src/grpc_interceptor.rs +++ b/core/common/src/grpc_interceptor.rs @@ -140,13 +140,13 @@ where Pin> + Send + 'static>>; /// Implementation of tower's Service trait's poll_ready method. - /// See https://docs.rs/tower/latest/tower/trait.Service.html + /// See fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.service.poll_ready(cx) } /// Implementation of tower's Service trait's call method. - /// See https://docs.rs/tower/latest/tower/trait.Service.html + /// See fn call( &mut self, mut request: http::request::Request, diff --git a/samples/common/src/constants.rs b/samples/common/src/constants.rs index 8e861898..d03205d5 100644 --- a/samples/common/src/constants.rs +++ b/samples/common/src/constants.rs @@ -29,7 +29,7 @@ pub mod chariott { } /// Media/MIME types. -/// Common MIME types can be found here: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Common_types +/// Common MIME types can be found here: pub mod mime_type { pub const JPEG_IMAGES: &str = "image/jpeg"; } From 6c752f9eb9544e49ac022275c06e20aaacc26d56 Mon Sep 17 00:00:00 2001 From: William Lyles <26171886+wilyle@users.noreply.github.com> Date: Mon, 27 Nov 2023 14:40:37 -0800 Subject: [PATCH 8/8] update markdown workflow --- .github/workflows/markdown-ci.yml | 4 +++- .markdownlinkcheck.jsonc => .markdownlinkcheck.json | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) rename .markdownlinkcheck.jsonc => .markdownlinkcheck.json (78%) diff --git a/.github/workflows/markdown-ci.yml b/.github/workflows/markdown-ci.yml index 659ac6d9..12ac6250 100644 --- a/.github/workflows/markdown-ci.yml +++ b/.github/workflows/markdown-ci.yml @@ -8,6 +8,8 @@ on: - 'docs/**' - '**.md' - '**.markdown' + - '.markdownlint.jsonc' + - '.markdownlinkcheck.json' jobs: lint-markdown: @@ -26,4 +28,4 @@ jobs: - uses: actions/checkout@v2 - run: | npm install markdown-link-check - find . -type d \( -name node_modules -o -name .github \) -prune -o -type f -name '*.md' -print0 | xargs -0 -n1 node_modules/.bin/markdown-link-check --config .markdownlinkcheck.jsonc --quiet + find . -type d \( -name node_modules -o -name .github \) -prune -o -type f -name '*.md' -print0 | xargs -0 -n1 node_modules/.bin/markdown-link-check --config .markdownlinkcheck.json --quiet diff --git a/.markdownlinkcheck.jsonc b/.markdownlinkcheck.json similarity index 78% rename from .markdownlinkcheck.jsonc rename to .markdownlinkcheck.json index b2446d8d..6e71c9d3 100644 --- a/.markdownlinkcheck.jsonc +++ b/.markdownlinkcheck.json @@ -2,6 +2,10 @@ "ignorePatterns": [ { "pattern": "^http://localhost" + }, + { + "_patternComment": "Checking aka.ms links is unstable and frequently fails on valid links", + "pattern": "^https://aka.ms" } ], "aliveStatusCodes": [