From 92d0019990f673f9b5feb5cc0eab8d568bbc7cd7 Mon Sep 17 00:00:00 2001 From: chesedo Date: Tue, 14 Feb 2023 16:03:23 +0200 Subject: [PATCH 1/8] feat: router --- Cargo.lock | 1 + auth/Cargo.toml | 1 + auth/src/args.rs | 2 +- auth/src/lib.rs | 13 +++++++++++++ auth/src/main.rs | 4 +++- auth/src/router.rs | 31 +++++++++++++++++++++++++++++++ 6 files changed, 50 insertions(+), 2 deletions(-) create mode 100644 auth/src/router.rs diff --git a/Cargo.lock b/Cargo.lock index 940abc5f2..834019a21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5803,6 +5803,7 @@ dependencies = [ name = "shuttle-auth" version = "0.1.0" dependencies = [ + "axum", "clap 4.0.27", "opentelemetry", "opentelemetry-datadog", diff --git a/auth/Cargo.toml b/auth/Cargo.toml index b7a58aac8..9feac3864 100644 --- a/auth/Cargo.toml +++ b/auth/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +axum = { workspace = true } clap = { workspace = true } opentelemetry = { workspace = true } opentelemetry-datadog = { workspace = true } diff --git a/auth/src/args.rs b/auth/src/args.rs index c2ec9a313..b7953c18a 100644 --- a/auth/src/args.rs +++ b/auth/src/args.rs @@ -10,5 +10,5 @@ pub struct Args { /// Address to bind to #[arg(long, default_value = "127.0.0.1:8000")] - pub user: SocketAddr, + pub address: SocketAddr, } diff --git a/auth/src/lib.rs b/auth/src/lib.rs index fdbbba1fa..fdcbdff6b 100644 --- a/auth/src/lib.rs +++ b/auth/src/lib.rs @@ -1,3 +1,16 @@ mod args; +mod router; pub use args::Args; +use tracing::info; + +pub async fn start(args: Args) { + let router = router::new(); + + info!(address=%args.address, "Binding to and listening at address"); + + axum::Server::bind(&args.address) + .serve(router.into_make_service()) + .await + .unwrap_or_else(|_| panic!("Failed to bind to address: {}", args.address)); +} diff --git a/auth/src/main.rs b/auth/src/main.rs index 2ed41bfd0..a12558859 100644 --- a/auth/src/main.rs +++ b/auth/src/main.rs @@ -1,6 +1,6 @@ use clap::Parser; use opentelemetry::global; -use shuttle_auth::Args; +use shuttle_auth::{start, Args}; use tracing::trace; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -43,4 +43,6 @@ async fn main() { // .unwrap() // .to_string_lossy() // ); + + start(args).await; } diff --git a/auth/src/router.rs b/auth/src/router.rs new file mode 100644 index 000000000..768b57d60 --- /dev/null +++ b/auth/src/router.rs @@ -0,0 +1,31 @@ +use axum::{ + routing::{get, post}, + Router, +}; + +pub fn new() -> Router { + Router::new() + .route("/login", post(login)) + .route("/logout", post(logout)) + .route("/auth/session", post(convert_cookie)) + .route("/auth/key", post(convert_key)) + .route("/auth/refresh", post(refresh_token)) + .route("/public-key", get(get_public_key)) + .route("/user/:account_name", get(get_user).post(post_user)) +} + +async fn login() {} + +async fn logout() {} + +async fn convert_cookie() {} + +async fn convert_key() {} + +async fn refresh_token() {} + +async fn get_public_key() {} + +async fn get_user() {} + +async fn post_user() {} From 73116f09ae77370f9e93a91830b91a4ed701611f Mon Sep 17 00:00:00 2001 From: chesedo Date: Tue, 14 Feb 2023 16:21:21 +0200 Subject: [PATCH 2/8] feat: tracing and propagation --- Cargo.lock | 2 ++ Cargo.toml | 5 +++-- auth/Cargo.toml | 5 +++++ auth/src/router.rs | 50 +++++++++++++++++++++++++++++++++++++++++++++ deployer/Cargo.toml | 2 +- gateway/Cargo.toml | 2 +- 6 files changed, 62 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 834019a21..3a8ece241 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5808,7 +5808,9 @@ dependencies = [ "opentelemetry", "opentelemetry-datadog", "opentelemetry-http", + "shuttle-common", "tokio", + "tower-http 0.3.4", "tracing", "tracing-opentelemetry", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index e5788783b..fa87f6c22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,10 +44,11 @@ once_cell = "1.16.0" opentelemetry = { version = "0.18.0", features = ["rt-tokio"] } opentelemetry-datadog = { version = "0.6.0", features = ["reqwest-client"] } opentelemetry-http = "0.7.0" -uuid = "1.2.2" -thiserror = "1.0.37" serde = "1.0.148" serde_json = "1.0.89" +thiserror = "1.0.37" +tower-http = { version = "0.3.4", features = ["trace"] } tracing = "0.1.37" tracing-opentelemetry = "0.18.0" tracing-subscriber = "0.3.16" +uuid = "1.2.2" diff --git a/auth/Cargo.toml b/auth/Cargo.toml index 9feac3864..6bf82ce64 100644 --- a/auth/Cargo.toml +++ b/auth/Cargo.toml @@ -12,6 +12,11 @@ opentelemetry = { workspace = true } opentelemetry-datadog = { workspace = true } opentelemetry-http = { workspace = true } tokio = { version = "1.22.0", features = [ "full" ] } +tower-http = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } + +[dependencies.shuttle-common] +workspace = true +features = ["backend"] diff --git a/auth/src/router.rs b/auth/src/router.rs index 768b57d60..d51673923 100644 --- a/auth/src/router.rs +++ b/auth/src/router.rs @@ -1,7 +1,19 @@ +use std::time::Duration; + use axum::{ + body::{Body, BoxBody}, + extract::MatchedPath, + middleware::from_extractor, + response::Response, routing::{get, post}, Router, }; +use opentelemetry::global; +use opentelemetry_http::{HeaderExtractor, Request}; +use shuttle_common::backends::metrics::Metrics; +use tower_http::trace::TraceLayer; +use tracing::{debug, debug_span, field, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; pub fn new() -> Router { Router::new() @@ -12,6 +24,44 @@ pub fn new() -> Router { .route("/auth/refresh", post(refresh_token)) .route("/public-key", get(get_public_key)) .route("/user/:account_name", get(get_user).post(post_user)) + .route_layer(from_extractor::()) + .layer( + TraceLayer::new_for_http() + .make_span_with(|request: &Request| { + let path = if let Some(path) = request.extensions().get::() { + path.as_str() + } else { + "" + }; + + let span = debug_span!( + "request", + http.uri = %request.uri(), + http.method = %request.method(), + http.status_code = field::Empty, + // A bunch of extra things for metrics + // Should be able to make this clearer once `Valuable` support lands in tracing + request.path = path, + request.params.account_name = field::Empty, + ); + + let parent_context = global::get_text_map_propagator(|propagator| { + propagator.extract(&HeaderExtractor(request.headers())) + }); + span.set_parent(parent_context); + + span + }) + .on_response( + |response: &Response, latency: Duration, span: &Span| { + span.record("http.status_code", response.status().as_u16()); + debug!( + latency = format_args!("{} ns", latency.as_nanos()), + "finished processing request" + ); + }, + ), + ) } async fn login() {} diff --git a/deployer/Cargo.toml b/deployer/Cargo.toml index 7a0dc8458..416de0491 100644 --- a/deployer/Cargo.toml +++ b/deployer/Cargo.toml @@ -45,7 +45,7 @@ tokio = { version = "1.22.0", features = ["fs"] } toml = "0.5.9" tonic = "0.8.3" tower = { version = "0.4.13", features = ["make"] } -tower-http = { version = "0.3.4", features = ["auth", "trace"] } +tower-http = { workspace = true, features = ["auth"] } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index e66314b68..f8d47a2ca 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -38,7 +38,7 @@ sqlx = { version = "0.6.2", features = [ "sqlite", "json", "runtime-tokio-native strum = { version = "0.24.1", features = ["derive"] } tokio = { version = "1.22.0", features = [ "full" ] } tower = { version = "0.4.13", features = [ "steer" ] } -tower-http = { version = "0.3.4", features = ["trace"] } +tower-http = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } From bf3f7bf9b5426a2892c1a8e197d1d23d6de2f74e Mon Sep 17 00:00:00 2001 From: chesedo Date: Wed, 15 Feb 2023 10:10:28 +0200 Subject: [PATCH 3/8] refactor: request tracing --- Cargo.lock | 7 +- auth/Cargo.toml | 2 - auth/src/router.rs | 52 ++-------- common/Cargo.toml | 6 +- common/src/backends/metrics.rs | 177 ++++++++++++++++++++++++++++++++- deployer/src/handlers/mod.rs | 63 +++--------- gateway/Cargo.toml | 1 - gateway/src/api/latest.rs | 47 +++------ 8 files changed, 216 insertions(+), 139 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3a8ece241..2f6eee440 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5807,10 +5807,8 @@ dependencies = [ "clap 4.0.27", "opentelemetry", "opentelemetry-datadog", - "opentelemetry-http", "shuttle-common", "tokio", - "tower-http 0.3.4", "tracing", "tracing-opentelemetry", "tracing-subscriber", @@ -5840,12 +5838,16 @@ dependencies = [ "crossterm", "http 0.2.8", "once_cell", + "opentelemetry", + "opentelemetry-http", "reqwest", "rustrict", "serde", "serde_json", "strum", + "tower-http 0.3.4", "tracing", + "tracing-opentelemetry", "uuid 1.2.2", ] @@ -5938,7 +5940,6 @@ dependencies = [ "tempfile", "tokio", "tower", - "tower-http 0.3.4", "tracing", "tracing-opentelemetry", "tracing-subscriber", diff --git a/auth/Cargo.toml b/auth/Cargo.toml index 6bf82ce64..ce4ebb29a 100644 --- a/auth/Cargo.toml +++ b/auth/Cargo.toml @@ -10,9 +10,7 @@ axum = { workspace = true } clap = { workspace = true } opentelemetry = { workspace = true } opentelemetry-datadog = { workspace = true } -opentelemetry-http = { workspace = true } tokio = { version = "1.22.0", features = [ "full" ] } -tower-http = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/auth/src/router.rs b/auth/src/router.rs index d51673923..063d5061a 100644 --- a/auth/src/router.rs +++ b/auth/src/router.rs @@ -1,19 +1,10 @@ -use std::time::Duration; - use axum::{ - body::{Body, BoxBody}, - extract::MatchedPath, middleware::from_extractor, - response::Response, routing::{get, post}, Router, }; -use opentelemetry::global; -use opentelemetry_http::{HeaderExtractor, Request}; -use shuttle_common::backends::metrics::Metrics; -use tower_http::trace::TraceLayer; -use tracing::{debug, debug_span, field, Span}; -use tracing_opentelemetry::OpenTelemetrySpanExt; +use shuttle_common::backends::metrics::{Metrics, TraceLayer}; +use tracing::field; pub fn new() -> Router { Router::new() @@ -26,41 +17,10 @@ pub fn new() -> Router { .route("/user/:account_name", get(get_user).post(post_user)) .route_layer(from_extractor::()) .layer( - TraceLayer::new_for_http() - .make_span_with(|request: &Request| { - let path = if let Some(path) = request.extensions().get::() { - path.as_str() - } else { - "" - }; - - let span = debug_span!( - "request", - http.uri = %request.uri(), - http.method = %request.method(), - http.status_code = field::Empty, - // A bunch of extra things for metrics - // Should be able to make this clearer once `Valuable` support lands in tracing - request.path = path, - request.params.account_name = field::Empty, - ); - - let parent_context = global::get_text_map_propagator(|propagator| { - propagator.extract(&HeaderExtractor(request.headers())) - }); - span.set_parent(parent_context); - - span - }) - .on_response( - |response: &Response, latency: Duration, span: &Span| { - span.record("http.status_code", response.status().as_u16()); - debug!( - latency = format_args!("{} ns", latency.as_nanos()), - "finished processing request" - ); - }, - ), + TraceLayer::new() + .extra_fields(|_| vec![("request.params.account_name", Box::new(field::Empty))]) + .with_propagation() + .build(), ) } diff --git a/common/Cargo.toml b/common/Cargo.toml index 262074849..8697c7574 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -16,15 +16,19 @@ comfy-table = { version = "6.1.3", optional = true } crossterm = { version = "0.25.0", optional = true } http = { version = "0.2.8", optional = true } once_cell = { workspace = true } +opentelemetry = { workspace = true, optional = true } +opentelemetry-http = { workspace = true, optional = true } reqwest = { version = "0.11.13", optional = true } rustrict = "0.5.5" serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true, optional = true } strum = { version = "0.24.1", features = ["derive"] } +tower-http = { workspace = true, optional = true } tracing = { workspace = true } +tracing-opentelemetry = { workspace = true, optional = true } uuid = { workspace = true, features = ["v4", "serde"] } [features] -backend = ["async-trait", "axum"] +backend = ["async-trait", "axum", "opentelemetry", "opentelemetry-http", "tower-http", "tracing-opentelemetry"] display = ["comfy-table", "crossterm"] models = ["anyhow", "async-trait", "display", "http", "reqwest", "serde_json"] diff --git a/common/src/backends/metrics.rs b/common/src/backends/metrics.rs index 9dabc5a1c..0b3c7437c 100644 --- a/common/src/backends/metrics.rs +++ b/common/src/backends/metrics.rs @@ -1,13 +1,21 @@ +use std::marker::PhantomData; +use std::time::Duration; use std::{collections::HashMap, convert::Infallible}; use async_trait::async_trait; -use axum::extract::{FromRequestParts, Path}; -use axum::http::request::Parts; -use tracing::Span; +use axum::body::{Body, BoxBody}; +use axum::extract::{FromRequestParts, MatchedPath, Path}; +use axum::http::{request::Parts, Request, Response}; +use opentelemetry::global; +use opentelemetry_http::HeaderExtractor; +use tower_http::classify::{ServerErrorsAsFailures, SharedClassifier}; +use tower_http::trace::DefaultOnRequest; +use tracing::{debug, debug_span, field, Span, Value}; +use tracing_opentelemetry::OpenTelemetrySpanExt; /// Used to record a bunch of metrics info /// The tracing layer on the server should record a `request.params.` field for each parameter -/// that should be recorded +/// that should be recorded. And the [TraceLayer] can be used to record the default `request.params.` pub struct Metrics; #[async_trait] @@ -33,3 +41,164 @@ where Ok(Metrics) } } + +type FnFields = fn(&Request) -> Vec<(&str, Box)>; + +/// Record the default tracing information for each request. These defaults are: +/// - The URI +/// - The method +/// - The status code +/// - The request path +pub struct TraceLayer { + fn_extra_fields: FnFields, + make_span_type: PhantomData, +} + +impl Default for TraceLayer { + fn default() -> Self { + Self::new() + } +} + +impl TraceLayer { + pub fn new() -> Self { + Self { + fn_extra_fields: |_| Default::default(), + make_span_type: PhantomData, + } + } + + /// Set a function to record extra tracing fields. These might be fields set by [Metrics]. + /// + /// # Example + /// ``` + /// TraceLayer::new() + /// .extra_fields(|_| vec![("request.params.account_name", &field::Empty)]) + /// .build() + /// ``` + pub fn extra_fields(mut self, fn_extra_fields: FnFields) -> Self { + self.fn_extra_fields = fn_extra_fields; + + self + } +} + +impl + MakeSpanBuilder> TraceLayer { + /// Build the configured tracing layer + pub fn build( + self, + ) -> tower_http::trace::TraceLayer< + SharedClassifier, + MakeSpan, + DefaultOnRequest, + OnResponseStatusCode, + > { + tower_http::trace::TraceLayer::new_for_http() + .make_span_with(MakeSpan::new(self.fn_extra_fields)) + .on_response(OnResponseStatusCode) + } +} + +impl TraceLayer { + /// Switch to the span maker which does not add propagation details from the request headers + pub fn without_propagation(self) -> Self { + self + } +} + +impl TraceLayer { + /// Switch to the span maker which add propagation details from the request headers + pub fn with_propagation(self) -> Self { + self + } +} + +/// Helper trait to make a new span maker +pub trait MakeSpanBuilder { + fn new(fn_extra_fields: FnFields) -> Self; +} + +/// Simple span maker which records the default traces with the extra given by the user +#[derive(Clone)] +pub struct MakeSpanSimple { + fn_extra_fields: FnFields, +} + +impl MakeSpanBuilder for MakeSpanSimple { + fn new(fn_extra_fields: FnFields) -> Self { + Self { fn_extra_fields } + } +} + +impl tower_http::trace::MakeSpan for MakeSpanSimple { + fn make_span(&mut self, request: &Request) -> Span { + get_span(request, self.fn_extra_fields) + } +} + +/// Span maker which records the default traces, those given by the user and extract a propagation context +/// from the request headers. +#[derive(Clone)] +pub struct MakeSpanPropagation { + fn_extra_fields: FnFields, +} + +impl MakeSpanBuilder for MakeSpanPropagation { + fn new(fn_extra_fields: FnFields) -> Self { + Self { fn_extra_fields } + } +} + +impl tower_http::trace::MakeSpan for MakeSpanPropagation { + fn make_span(&mut self, request: &Request) -> Span { + let span = get_span(request, self.fn_extra_fields); + + let parent_context = global::get_text_map_propagator(|propagator| { + propagator.extract(&HeaderExtractor(request.headers())) + }); + span.set_parent(parent_context); + + span + } +} + +/// Extract and records the status code from the response. And logs out timing info +#[derive(Clone)] +pub struct OnResponseStatusCode; + +impl tower_http::trace::OnResponse for OnResponseStatusCode { + fn on_response(self, response: &Response, latency: Duration, span: &Span) { + span.record("http.status_code", response.status().as_u16()); + debug!( + latency = format_args!("{} ns", latency.as_nanos()), + "finished processing request" + ); + } +} + +#[inline] +fn get_span(request: &Request, fn_extra_fields: FnFields) -> Span { + let path = if let Some(path) = request.extensions().get::() { + path.as_str() + } else { + "" + }; + + let span = debug_span!( + "request", + http.uri = %request.uri(), + http.method = %request.method(), + http.status_code = field::Empty, + // A bunch of extra things for metrics + // Should be able to make this clearer once `Valuable` support lands in tracing + request.path = path, + ); + + let extra_fields = (fn_extra_fields)(request); + + for (key, value) in extra_fields { + span.record(key, value); + } + + span +} diff --git a/deployer/src/handlers/mod.rs b/deployer/src/handlers/mod.rs index 65152d767..a5ca42ace 100644 --- a/deployer/src/handlers/mod.rs +++ b/deployer/src/handlers/mod.rs @@ -1,9 +1,7 @@ mod error; -use axum::body::{Body, BoxBody}; use axum::extract::ws::{self, WebSocket}; -use axum::extract::{Extension, MatchedPath, Path, Query}; -use axum::http::{Request, Response}; +use axum::extract::{Extension, Path, Query}; use axum::middleware::from_extractor; use axum::routing::{get, post, Router}; use axum::{extract::BodyStream, Json}; @@ -11,24 +9,19 @@ use bytes::BufMut; use chrono::{TimeZone, Utc}; use fqdn::FQDN; use futures::StreamExt; -use opentelemetry::global; -use opentelemetry_http::HeaderExtractor; -use shuttle_common::backends::metrics::Metrics; +use shuttle_common::backends::metrics::{Metrics, TraceLayer}; use shuttle_common::models::secret; use shuttle_common::project::ProjectName; use shuttle_common::LogItem; use shuttle_service::loader::clean_crate; use tower_http::auth::RequireAuthorizationLayer; -use tower_http::trace::TraceLayer; -use tracing::{debug, debug_span, error, field, instrument, trace, Span}; -use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing::{debug, error, field, instrument, trace}; use uuid::Uuid; use crate::deployment::{DeploymentManager, Queued}; use crate::persistence::{Deployment, Log, Persistence, SecretGetter, State}; use std::collections::HashMap; -use std::time::Duration; pub use {self::error::Error, self::error::Result}; @@ -76,48 +69,22 @@ pub fn make_router( .route("/projects/:project_name/status", get(get_status)) .route_layer(from_extractor::()) .layer( - TraceLayer::new_for_http() - .make_span_with(|request: &Request| { - let path = if let Some(path) = request.extensions().get::() { - path.as_str() - } else { - "" - }; - + TraceLayer::new() + .extra_fields(|request| { let account_name = request .headers() .get("X-Shuttle-Account-Name") - .map(|value| value.to_str().unwrap_or_default()); - - let span = debug_span!( - "request", - http.uri = %request.uri(), - http.method = %request.method(), - http.status_code = field::Empty, - account.name = account_name, - // A bunch of extra things for metrics - // Should be able to make this clearer once `Valuable` support lands in tracing - request.path = path, - request.params.project_name = field::Empty, - request.params.service_name = field::Empty, - request.params.deployment_id = field::Empty, - ); - let parent_context = global::get_text_map_propagator(|propagator| { - propagator.extract(&HeaderExtractor(request.headers())) - }); - span.set_parent(parent_context); - - span + .map(|value| value.to_str().unwrap_or_default().to_string()); + + vec![ + ("account.name", Box::new(account_name)), + ("request.params.project_name", Box::new(field::Empty)), + ("request.params.service_name", Box::new(field::Empty)), + ("request.params.deployment_id", Box::new(field::Empty)), + ] }) - .on_response( - |response: &Response, latency: Duration, span: &Span| { - span.record("http.status_code", response.status().as_u16()); - debug!( - latency = format_args!("{} ns", latency.as_nanos()), - "finished processing request" - ); - }, - ), + .with_propagation() + .build(), ) .route_layer(from_extractor::()) .layer(Extension(project_name)) diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index f8d47a2ca..61cbbdc65 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -38,7 +38,6 @@ sqlx = { version = "0.6.2", features = [ "sqlite", "json", "runtime-tokio-native strum = { version = "0.24.1", features = ["derive"] } tokio = { version = "1.22.0", features = [ "full" ] } tower = { version = "0.4.13", features = [ "steer" ] } -tower-http = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/gateway/src/api/latest.rs b/gateway/src/api/latest.rs index b9210cc97..c34ca6533 100644 --- a/gateway/src/api/latest.rs +++ b/gateway/src/api/latest.rs @@ -3,8 +3,8 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use axum::body::{Body, BoxBody}; -use axum::extract::{Extension, MatchedPath, Path, State}; +use axum::body::Body; +use axum::extract::{Extension, Path, State}; use axum::http::Request; use axum::middleware::from_extractor; use axum::response::Response; @@ -15,13 +15,12 @@ use futures::Future; use http::StatusCode; use instant_acme::{AccountCredentials, ChallengeType}; use serde::{Deserialize, Serialize}; -use shuttle_common::backends::metrics::Metrics; +use shuttle_common::backends::metrics::{Metrics, TraceLayer}; use shuttle_common::models::error::ErrorKind; use shuttle_common::models::{project, stats, user}; use tokio::sync::mpsc::Sender; use tokio::sync::{Mutex, MutexGuard}; -use tower_http::trace::TraceLayer; -use tracing::{debug, debug_span, field, instrument, Span}; +use tracing::{field, instrument}; use ttl_cache::TtlCache; use uuid::Uuid; @@ -436,36 +435,16 @@ impl ApiBuilder { pub fn with_default_traces(mut self) -> Self { self.router = self.router.route_layer(from_extractor::()).layer( - TraceLayer::new_for_http() - .make_span_with(|request: &Request| { - let path = if let Some(path) = request.extensions().get::() { - path.as_str() - } else { - "" - }; - - debug_span!( - "request", - http.uri = %request.uri(), - http.method = %request.method(), - http.status_code = field::Empty, - account.name = field::Empty, - // A bunch of extra things for metrics - // Should be able to make this clearer once `Valuable` support lands in tracing - request.path = path, - request.params.project_name = field::Empty, - request.params.account_name = field::Empty, - ) + TraceLayer::new() + .extra_fields(|_| { + vec![ + ("account_name", Box::new(field::Empty)), + ("request.params.project_name", Box::new(field::Empty)), + ("request.params.account_name", Box::new(field::Empty)), + ] }) - .on_response( - |response: &Response, latency: Duration, span: &Span| { - span.record("http.status_code", response.status().as_u16()); - debug!( - latency = format_args!("{} ns", latency.as_nanos()), - "finished processing request" - ); - }, - ), + .without_propagation() + .build(), ); self } From b171d928ef2ea0fa7394c9460f99f3de7d094383 Mon Sep 17 00:00:00 2001 From: chesedo Date: Wed, 15 Feb 2023 12:10:56 +0200 Subject: [PATCH 4/8] tests: TraceLayer --- Cargo.lock | 15 +++ auth/src/router.rs | 14 +- common/Cargo.toml | 7 + common/src/backends/metrics.rs | 235 ++++++++++++++++++++++++--------- deployer/src/handlers/mod.rs | 34 ++--- gateway/src/api/latest.rs | 21 +-- 6 files changed, 234 insertions(+), 92 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2f6eee440..20573c774 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5845,9 +5845,13 @@ dependencies = [ "serde", "serde_json", "strum", + "tokio", + "tower", "tower-http 0.3.4", "tracing", + "tracing-fluent-assertions", "tracing-opentelemetry", + "tracing-subscriber", "uuid 1.2.2", ] @@ -7086,6 +7090,17 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-fluent-assertions" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12de1a8c6bcfee614305e836308b596bbac831137a04c61f7e5b0b0bf2cfeaf6" +dependencies = [ + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "tracing-futures" version = "0.2.5" diff --git a/auth/src/router.rs b/auth/src/router.rs index 063d5061a..5aa59dbde 100644 --- a/auth/src/router.rs +++ b/auth/src/router.rs @@ -3,7 +3,10 @@ use axum::{ routing::{get, post}, Router, }; -use shuttle_common::backends::metrics::{Metrics, TraceLayer}; +use shuttle_common::{ + backends::metrics::{Metrics, TraceLayer}, + request_span, +}; use tracing::field; pub fn new() -> Router { @@ -17,10 +20,11 @@ pub fn new() -> Router { .route("/user/:account_name", get(get_user).post(post_user)) .route_layer(from_extractor::()) .layer( - TraceLayer::new() - .extra_fields(|_| vec![("request.params.account_name", Box::new(field::Empty))]) - .with_propagation() - .build(), + TraceLayer::new(|request| { + request_span!(request, request.params.account_name = field::Empty) + }) + .with_propagation() + .build(), ) } diff --git a/common/Cargo.toml b/common/Cargo.toml index 8697c7574..87709962d 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -32,3 +32,10 @@ uuid = { workspace = true, features = ["v4", "serde"] } backend = ["async-trait", "axum", "opentelemetry", "opentelemetry-http", "tower-http", "tracing-opentelemetry"] display = ["comfy-table", "crossterm"] models = ["anyhow", "async-trait", "display", "http", "reqwest", "serde_json"] + +[dev-dependencies] +axum = { workspace = true } +tokio = { version = "1.22.0", features = ["macros", "rt-multi-thread"] } +tower = { version = "0.4", features = ["util"] } +tracing-fluent-assertions = "0.3.0" +tracing-subscriber = { version = "0.3", default-features = false } diff --git a/common/src/backends/metrics.rs b/common/src/backends/metrics.rs index 0b3c7437c..e88b6967d 100644 --- a/common/src/backends/metrics.rs +++ b/common/src/backends/metrics.rs @@ -4,13 +4,13 @@ use std::{collections::HashMap, convert::Infallible}; use async_trait::async_trait; use axum::body::{Body, BoxBody}; -use axum::extract::{FromRequestParts, MatchedPath, Path}; +use axum::extract::{FromRequestParts, Path}; use axum::http::{request::Parts, Request, Response}; use opentelemetry::global; use opentelemetry_http::HeaderExtractor; use tower_http::classify::{ServerErrorsAsFailures, SharedClassifier}; use tower_http::trace::DefaultOnRequest; -use tracing::{debug, debug_span, field, Span, Value}; +use tracing::{debug, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; /// Used to record a bunch of metrics info @@ -42,44 +42,32 @@ where } } -type FnFields = fn(&Request) -> Vec<(&str, Box)>; +type FnSpan = fn(&Request) -> Span; -/// Record the default tracing information for each request. These defaults are: -/// - The URI -/// - The method -/// - The status code -/// - The request path +/// Record the tracing information for each request as given by the function to create a span pub struct TraceLayer { - fn_extra_fields: FnFields, + fn_span: FnSpan, make_span_type: PhantomData, } - -impl Default for TraceLayer { - fn default() -> Self { - Self::new() - } -} - impl TraceLayer { - pub fn new() -> Self { - Self { - fn_extra_fields: |_| Default::default(), - make_span_type: PhantomData, - } - } - - /// Set a function to record extra tracing fields. These might be fields set by [Metrics]. + /// Create a trace layer using the give function to create spans. The span fields might be set by [Metrics] later. /// /// # Example /// ``` - /// TraceLayer::new() - /// .extra_fields(|_| vec![("request.params.account_name", &field::Empty)]) - /// .build() + /// TraceLayer::new(|request| { + /// request_span!( + /// request, + /// request.params.param = field::Empty + /// ) + /// }) + /// .without_propagation() + /// .build(); /// ``` - pub fn extra_fields(mut self, fn_extra_fields: FnFields) -> Self { - self.fn_extra_fields = fn_extra_fields; - - self + pub fn new(fn_span: FnSpan) -> Self { + Self { + fn_span, + make_span_type: PhantomData, + } } } @@ -94,7 +82,7 @@ impl + MakeSpanBuilder> TraceLayer { tower_http::trace::TraceLayer::new_for_http() - .make_span_with(MakeSpan::new(self.fn_extra_fields)) + .make_span_with(MakeSpan::new(self.fn_span)) .on_response(OnResponseStatusCode) } } @@ -115,24 +103,24 @@ impl TraceLayer { /// Helper trait to make a new span maker pub trait MakeSpanBuilder { - fn new(fn_extra_fields: FnFields) -> Self; + fn new(fn_span: FnSpan) -> Self; } /// Simple span maker which records the default traces with the extra given by the user #[derive(Clone)] pub struct MakeSpanSimple { - fn_extra_fields: FnFields, + fn_span: FnSpan, } impl MakeSpanBuilder for MakeSpanSimple { - fn new(fn_extra_fields: FnFields) -> Self { - Self { fn_extra_fields } + fn new(fn_span: FnSpan) -> Self { + Self { fn_span } } } impl tower_http::trace::MakeSpan for MakeSpanSimple { fn make_span(&mut self, request: &Request) -> Span { - get_span(request, self.fn_extra_fields) + (self.fn_span)(request) } } @@ -140,18 +128,18 @@ impl tower_http::trace::MakeSpan for MakeSpanSimple { /// from the request headers. #[derive(Clone)] pub struct MakeSpanPropagation { - fn_extra_fields: FnFields, + fn_span: FnSpan, } impl MakeSpanBuilder for MakeSpanPropagation { - fn new(fn_extra_fields: FnFields) -> Self { - Self { fn_extra_fields } + fn new(fn_span: FnSpan) -> Self { + Self { fn_span } } } impl tower_http::trace::MakeSpan for MakeSpanPropagation { fn make_span(&mut self, request: &Request) -> Span { - let span = get_span(request, self.fn_extra_fields); + let span = (self.fn_span)(request); let parent_context = global::get_text_map_propagator(|propagator| { propagator.extract(&HeaderExtractor(request.headers())) @@ -176,29 +164,156 @@ impl tower_http::trace::OnResponse for OnResponseStatusCode { } } -#[inline] -fn get_span(request: &Request, fn_extra_fields: FnFields) -> Span { - let path = if let Some(path) = request.extensions().get::() { - path.as_str() - } else { - "" +/// Simple macro to record the following default for each request: +/// - The URI +/// - The method +/// - The status code +/// - The request path +#[macro_export] +macro_rules! request_span { + ($request:expr, $($field:tt)*) => { + { + let path = if let Some(path) = $request.extensions().get::() { + path.as_str() + } else { + "" + }; + + tracing::debug_span!( + "request", + http.uri = %$request.uri(), + http.method = %$request.method(), + http.status_code = tracing::field::Empty, + // A bunch of extra things for metrics + // Should be able to make this clearer once `Valuable` support lands in tracing + request.path = path, + $($field)* + ) + } + }; + ($request:expr) => { + $crate::request_span!($request, ) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use axum::{ + body::Body, extract::Path, http::Request, middleware::from_extractor, + response::IntoResponse, routing::get, Router, }; + use once_cell::sync::OnceCell; + use tokio::time::sleep; + use tower::ServiceExt; + use tracing::field; + use tracing_fluent_assertions::{AssertionRegistry, AssertionsLayer}; + use tracing_subscriber::{layer::SubscriberExt, Registry}; + + use super::{Metrics, TraceLayer}; + + async fn hello() -> impl IntoResponse { + "hello" + } + + async fn hello_user(Path(user_name): Path) -> impl IntoResponse { + format!("hello {user_name}") + } + + fn get_assertion_registry() -> &'static AssertionRegistry { + static REGISTRY: OnceCell = OnceCell::new(); + REGISTRY.get_or_init(|| { + let assertion_registry = AssertionRegistry::default(); + let base_subscriber = Registry::default(); + let subscriber = base_subscriber.with(AssertionsLayer::new(&assertion_registry)); + tracing::subscriber::set_global_default(subscriber).unwrap(); + + assertion_registry + }) + } + + #[tokio::test] + async fn basic() { + let assertion_registry = get_assertion_registry(); + let router: Router<()> = Router::new() + .route("/hello", get(hello)) + .route_layer(from_extractor::()) + .layer( + TraceLayer::new(|request| request_span!(request)) + .without_propagation() + .build(), + ); + + let request_span = assertion_registry + .build() + .with_name("request") + .with_span_field("http.uri") + .with_span_field("http.method") + .with_span_field("http.status_code") + .with_span_field("request.path") + .was_created() + .finalize(); - let span = debug_span!( - "request", - http.uri = %request.uri(), - http.method = %request.method(), - http.status_code = field::Empty, - // A bunch of extra things for metrics - // Should be able to make this clearer once `Valuable` support lands in tracing - request.path = path, - ); + router + .oneshot( + Request::builder() + .uri("/hello") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); - let extra_fields = (fn_extra_fields)(request); + // Give time for the span to be recorded + sleep(Duration::from_millis(100)).await; - for (key, value) in extra_fields { - span.record(key, value); + request_span.assert(); } - span + #[tokio::test] + async fn complex() { + let assertion_registry = get_assertion_registry(); + let router: Router<()> = Router::new() + .route("/hello/:user_name", get(hello_user)) + .route_layer(from_extractor::()) + .layer( + TraceLayer::new(|request| { + request_span!( + request, + request.params.user_name = field::Empty, + extra = "value" + ) + }) + .without_propagation() + .build(), + ); + + let request_span = assertion_registry + .build() + .with_name("request") + .with_span_field("http.uri") + .with_span_field("http.method") + .with_span_field("http.status_code") + .with_span_field("request.path") + .with_span_field("request.params.user_name") + .with_span_field("extra") + .was_created() + .finalize(); + + router + .oneshot( + Request::builder() + .uri("/hello/ferries") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + // Give time for the span to be recorded + sleep(Duration::from_millis(100)).await; + + request_span.assert(); + } } diff --git a/deployer/src/handlers/mod.rs b/deployer/src/handlers/mod.rs index a5ca42ace..2d9b0bab1 100644 --- a/deployer/src/handlers/mod.rs +++ b/deployer/src/handlers/mod.rs @@ -12,7 +12,7 @@ use futures::StreamExt; use shuttle_common::backends::metrics::{Metrics, TraceLayer}; use shuttle_common::models::secret; use shuttle_common::project::ProjectName; -use shuttle_common::LogItem; +use shuttle_common::{request_span, LogItem}; use shuttle_service::loader::clean_crate; use tower_http::auth::RequireAuthorizationLayer; use tracing::{debug, error, field, instrument, trace}; @@ -69,22 +69,22 @@ pub fn make_router( .route("/projects/:project_name/status", get(get_status)) .route_layer(from_extractor::()) .layer( - TraceLayer::new() - .extra_fields(|request| { - let account_name = request - .headers() - .get("X-Shuttle-Account-Name") - .map(|value| value.to_str().unwrap_or_default().to_string()); - - vec![ - ("account.name", Box::new(account_name)), - ("request.params.project_name", Box::new(field::Empty)), - ("request.params.service_name", Box::new(field::Empty)), - ("request.params.deployment_id", Box::new(field::Empty)), - ] - }) - .with_propagation() - .build(), + TraceLayer::new(|request| { + let account_name = request + .headers() + .get("X-Shuttle-Account-Name") + .map(|value| value.to_str().unwrap_or_default().to_string()); + + request_span!( + request, + account.name = account_name, + request.params.project_name = field::Empty, + request.params.service_name = field::Empty, + request.params.deployment_id = field::Empty, + ) + }) + .with_propagation() + .build(), ) .route_layer(from_extractor::()) .layer(Extension(project_name)) diff --git a/gateway/src/api/latest.rs b/gateway/src/api/latest.rs index c34ca6533..d2403b995 100644 --- a/gateway/src/api/latest.rs +++ b/gateway/src/api/latest.rs @@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize}; use shuttle_common::backends::metrics::{Metrics, TraceLayer}; use shuttle_common::models::error::ErrorKind; use shuttle_common::models::{project, stats, user}; +use shuttle_common::request_span; use tokio::sync::mpsc::Sender; use tokio::sync::{Mutex, MutexGuard}; use tracing::{field, instrument}; @@ -435,16 +436,16 @@ impl ApiBuilder { pub fn with_default_traces(mut self) -> Self { self.router = self.router.route_layer(from_extractor::()).layer( - TraceLayer::new() - .extra_fields(|_| { - vec![ - ("account_name", Box::new(field::Empty)), - ("request.params.project_name", Box::new(field::Empty)), - ("request.params.account_name", Box::new(field::Empty)), - ] - }) - .without_propagation() - .build(), + TraceLayer::new(|request| { + request_span!( + request, + account_name = field::Empty, + request.params.project_name = field::Empty, + request.params.account_name = field::Empty + ) + }) + .without_propagation() + .build(), ); self } From 6d0c031975bf311da3bab9334a8c9a919cb614ca Mon Sep 17 00:00:00 2001 From: chesedo Date: Wed, 15 Feb 2023 12:12:16 +0200 Subject: [PATCH 5/8] refactor: clippy suggestion --- gateway/src/project.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/gateway/src/project.rs b/gateway/src/project.rs index 1f2ed63db..df4a48db4 100644 --- a/gateway/src/project.rs +++ b/gateway/src/project.rs @@ -1463,20 +1463,17 @@ pub mod exec { .inspect_container(safe_unwrap!(container.id), None) .await { - match container.state { - Some(_) => { - _ = gateway - .new_task() - .project(project_name) - .and_then(task::run(|ctx| async move { - TaskResult::Done(Project::Rebooting(ProjectRebooting { - container: ctx.state.container().unwrap(), - })) + if container.state.is_some() { + _ = gateway + .new_task() + .project(project_name) + .and_then(task::run(|ctx| async move { + TaskResult::Done(Project::Rebooting(ProjectRebooting { + container: ctx.state.container().unwrap(), })) - .send(&sender) - .await; - } - _ => {} + })) + .send(&sender) + .await; } } } From 88aa780a03b7d85cb045d32ed4f7e0180811d61a Mon Sep 17 00:00:00 2001 From: chesedo Date: Wed, 15 Feb 2023 12:17:09 +0200 Subject: [PATCH 6/8] refactor: better comments --- common/src/backends/metrics.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/common/src/backends/metrics.rs b/common/src/backends/metrics.rs index e88b6967d..800d654b9 100644 --- a/common/src/backends/metrics.rs +++ b/common/src/backends/metrics.rs @@ -95,7 +95,7 @@ impl TraceLayer { } impl TraceLayer { - /// Switch to the span maker which add propagation details from the request headers + /// Switch to the span maker which adds propagation details from the request headers pub fn with_propagation(self) -> Self { self } @@ -106,7 +106,7 @@ pub trait MakeSpanBuilder { fn new(fn_span: FnSpan) -> Self; } -/// Simple span maker which records the default traces with the extra given by the user +/// Simple span maker which records the span given by the user #[derive(Clone)] pub struct MakeSpanSimple { fn_span: FnSpan, @@ -124,7 +124,7 @@ impl tower_http::trace::MakeSpan for MakeSpanSimple { } } -/// Span maker which records the default traces, those given by the user and extract a propagation context +/// Span maker which records the span given by the user and extracts a propagation context /// from the request headers. #[derive(Clone)] pub struct MakeSpanPropagation { @@ -164,7 +164,7 @@ impl tower_http::trace::OnResponse for OnResponseStatusCode { } } -/// Simple macro to record the following default for each request: +/// Simple macro to record the following defaults for each request: /// - The URI /// - The method /// - The status code From 3311d2b149d6d28f5a6fecf7121e93238ed323df Mon Sep 17 00:00:00 2001 From: Pieter Date: Wed, 15 Feb 2023 12:19:24 +0200 Subject: [PATCH 7/8] Update gateway/src/api/latest.rs --- gateway/src/api/latest.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateway/src/api/latest.rs b/gateway/src/api/latest.rs index d2403b995..4863439f4 100644 --- a/gateway/src/api/latest.rs +++ b/gateway/src/api/latest.rs @@ -439,7 +439,7 @@ impl ApiBuilder { TraceLayer::new(|request| { request_span!( request, - account_name = field::Empty, + account.name = field::Empty, request.params.project_name = field::Empty, request.params.account_name = field::Empty ) From 968ff6f773cfdb81de5f905129f871a4f8f6eb8e Mon Sep 17 00:00:00 2001 From: chesedo Date: Wed, 15 Feb 2023 13:41:23 +0200 Subject: [PATCH 8/8] tests: more deterministic --- Cargo.lock | 1 + common/Cargo.toml | 1 + common/src/backends/metrics.rs | 184 ++++++++++++++++----------------- 3 files changed, 91 insertions(+), 95 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 20573c774..0e112bb77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5837,6 +5837,7 @@ dependencies = [ "comfy-table", "crossterm", "http 0.2.8", + "hyper", "once_cell", "opentelemetry", "opentelemetry-http", diff --git a/common/Cargo.toml b/common/Cargo.toml index 87709962d..ba239cf02 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -35,6 +35,7 @@ models = ["anyhow", "async-trait", "display", "http", "reqwest", "serde_json"] [dev-dependencies] axum = { workspace = true } +hyper = "0.14.23" tokio = { version = "1.22.0", features = ["macros", "rt-multi-thread"] } tower = { version = "0.4", features = ["util"] } tracing-fluent-assertions = "0.3.0" diff --git a/common/src/backends/metrics.rs b/common/src/backends/metrics.rs index 800d654b9..9d7fc2b77 100644 --- a/common/src/backends/metrics.rs +++ b/common/src/backends/metrics.rs @@ -198,14 +198,11 @@ macro_rules! request_span { #[cfg(test)] mod tests { - use std::time::Duration; - use axum::{ - body::Body, extract::Path, http::Request, middleware::from_extractor, + body::Body, extract::Path, http::Request, http::StatusCode, middleware::from_extractor, response::IntoResponse, routing::get, Router, }; - use once_cell::sync::OnceCell; - use tokio::time::sleep; + use hyper::body; use tower::ServiceExt; use tracing::field; use tracing_fluent_assertions::{AssertionRegistry, AssertionsLayer}; @@ -221,99 +218,96 @@ mod tests { format!("hello {user_name}") } - fn get_assertion_registry() -> &'static AssertionRegistry { - static REGISTRY: OnceCell = OnceCell::new(); - REGISTRY.get_or_init(|| { - let assertion_registry = AssertionRegistry::default(); - let base_subscriber = Registry::default(); - let subscriber = base_subscriber.with(AssertionsLayer::new(&assertion_registry)); - tracing::subscriber::set_global_default(subscriber).unwrap(); + #[tokio::test] + async fn trace_layer() { + let assertion_registry = AssertionRegistry::default(); + let base_subscriber = Registry::default(); + let subscriber = base_subscriber.with(AssertionsLayer::new(&assertion_registry)); + tracing::subscriber::set_global_default(subscriber).unwrap(); - assertion_registry - }) - } + // Put in own block to make sure assertion to not interfere with the next test + { + let router: Router<()> = Router::new() + .route("/hello", get(hello)) + .route_layer(from_extractor::()) + .layer( + TraceLayer::new(|request| request_span!(request)) + .without_propagation() + .build(), + ); + + let request_span = assertion_registry + .build() + .with_name("request") + .with_span_field("http.uri") + .with_span_field("http.method") + .with_span_field("http.status_code") + .with_span_field("request.path") + .was_closed() + .finalize(); + + let response = router + .oneshot( + Request::builder() + .uri("/hello") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + let body = body::to_bytes(response.into_body()).await.unwrap(); + + assert_eq!(&body[..], b"hello"); + request_span.assert(); + } - #[tokio::test] - async fn basic() { - let assertion_registry = get_assertion_registry(); - let router: Router<()> = Router::new() - .route("/hello", get(hello)) - .route_layer(from_extractor::()) - .layer( - TraceLayer::new(|request| request_span!(request)) + { + let router: Router<()> = Router::new() + .route("/hello/:user_name", get(hello_user)) + .route_layer(from_extractor::()) + .layer( + TraceLayer::new(|request| { + request_span!( + request, + request.params.user_name = field::Empty, + extra = "value" + ) + }) .without_propagation() .build(), - ); - - let request_span = assertion_registry - .build() - .with_name("request") - .with_span_field("http.uri") - .with_span_field("http.method") - .with_span_field("http.status_code") - .with_span_field("request.path") - .was_created() - .finalize(); - - router - .oneshot( - Request::builder() - .uri("/hello") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - - // Give time for the span to be recorded - sleep(Duration::from_millis(100)).await; - - request_span.assert(); - } - - #[tokio::test] - async fn complex() { - let assertion_registry = get_assertion_registry(); - let router: Router<()> = Router::new() - .route("/hello/:user_name", get(hello_user)) - .route_layer(from_extractor::()) - .layer( - TraceLayer::new(|request| { - request_span!( - request, - request.params.user_name = field::Empty, - extra = "value" - ) - }) - .without_propagation() - .build(), - ); - - let request_span = assertion_registry - .build() - .with_name("request") - .with_span_field("http.uri") - .with_span_field("http.method") - .with_span_field("http.status_code") - .with_span_field("request.path") - .with_span_field("request.params.user_name") - .with_span_field("extra") - .was_created() - .finalize(); - - router - .oneshot( - Request::builder() - .uri("/hello/ferries") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - - // Give time for the span to be recorded - sleep(Duration::from_millis(100)).await; - - request_span.assert(); + ); + + let request_span = assertion_registry + .build() + .with_name("request") + .with_span_field("http.uri") + .with_span_field("http.method") + .with_span_field("http.status_code") + .with_span_field("request.path") + .with_span_field("request.params.user_name") + .with_span_field("extra") + .was_closed() + .finalize(); + + let response = router + .oneshot( + Request::builder() + .uri("/hello/ferries") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + let body = body::to_bytes(response.into_body()).await.unwrap(); + + assert_eq!(&body[..], b"hello ferries"); + request_span.assert(); + } } }