diff --git a/Cargo.toml b/Cargo.toml index bfffda55b751c..b9ac75053d9ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -197,7 +197,7 @@ opendal = {version = "0.38", default-features = false, features = ["native-tls", # Tower tower = { version = "0.4.13", default-features = false, features = ["buffer", "limit", "retry", "timeout", "util", "balance", "discover"] } -tower-http = { version = "0.4.4", default-features = false, features = ["decompression-gzip"]} +tower-http = { version = "0.4.4", default-features = false, features = ["decompression-gzip", "trace"]} # Serde serde = { version = "1.0.190", default-features = false, features = ["derive"] } serde-toml-merge = { version = "0.3.3", default-features = false } diff --git a/lib/vector-core/src/metrics/label_filter.rs b/lib/vector-core/src/metrics/label_filter.rs index 48cb1513752ac..8390620502a47 100644 --- a/lib/vector-core/src/metrics/label_filter.rs +++ b/lib/vector-core/src/metrics/label_filter.rs @@ -5,11 +5,24 @@ use metrics_tracing_context::LabelFilter; pub(crate) struct VectorLabelFilter; impl LabelFilter for VectorLabelFilter { - fn should_include_label(&self, _key: &KeyName, label: &Label) -> bool { - let key = label.key(); - key == "component_id" - || key == "component_type" - || key == "component_kind" - || key == "buffer_type" + fn should_include_label(&self, metric_key: &KeyName, label: &Label) -> bool { + let label_key = label.key(); + // HTTP Server-specific labels + if metric_key.as_str().starts_with("http_server_") + && (label_key == "method" || label_key == "path") + { + return true; + } + // gRPC Server-specific labels + if metric_key.as_str().starts_with("grpc_server_") + && (label_key == "grpc_method" || label_key == "grpc_service") + { + return true; + } + // Global labels + label_key == "component_id" + || label_key == "component_type" + || label_key == "component_kind" + || label_key == "buffer_type" } } diff --git a/src/api/server.rs b/src/api/server.rs index 935c0b03ecaa6..244b2c25c9b00 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -9,13 +9,17 @@ use async_graphql::{ Data, Request, Schema, }; use async_graphql_warp::{graphql_protocol, GraphQLResponse, GraphQLWebSocket}; +use hyper::{server::conn::AddrIncoming, service::make_service_fn, Server as HyperServer}; use tokio::runtime::Handle; use tokio::sync::oneshot; +use tower::ServiceBuilder; +use tracing::Span; use warp::{filters::BoxedFilter, http::Response, ws::Ws, Filter, Reply}; use super::{handler, schema, ShutdownTx}; use crate::{ config, + http::build_http_trace_layer, internal_events::{SocketBindError, SocketMode}, topology, }; @@ -39,20 +43,35 @@ impl Server { let (_shutdown, rx) = oneshot::channel(); // warp uses `tokio::spawn` and so needs us to enter the runtime context. let _guard = handle.enter(); - let (addr, server) = warp::serve(routes) - .try_bind_with_graceful_shutdown( - config.api.address.expect("No socket address"), - async { + + let addr = config.api.address.expect("No socket address"); + let incoming = AddrIncoming::bind(&addr).map_err(|error| { + emit!(SocketBindError { + mode: SocketMode::Tcp, + error: &error, + }); + error + })?; + + let span = Span::current(); + let make_svc = make_service_fn(move |_conn| { + let svc = ServiceBuilder::new() + .layer(build_http_trace_layer(span.clone())) + .service(warp::service(routes.clone())); + futures_util::future::ok::<_, Infallible>(svc) + }); + + let server = async move { + HyperServer::builder(incoming) + .serve(make_svc) + .with_graceful_shutdown(async { rx.await.ok(); - }, - ) - .map_err(|error| { - emit!(SocketBindError { - mode: SocketMode::Tcp, - error: &error, - }); - error - })?; + }) + .await + .map_err(|err| { + error!("An error occurred: {:?}.", err); + }) + }; // Update component schema with the config before starting the server. schema::components::update_config(config); diff --git a/src/http.rs b/src/http.rs index 39ed37053a786..646b8074d392f 100644 --- a/src/http.rs +++ b/src/http.rs @@ -2,11 +2,14 @@ use std::{ fmt, task::{Context, Poll}, + time::Duration, }; use futures::future::BoxFuture; use headers::{Authorization, HeaderMapExt}; -use http::{header::HeaderValue, request::Builder, uri::InvalidUri, HeaderMap, Request, Uri}; +use http::{ + header::HeaderValue, request::Builder, uri::InvalidUri, HeaderMap, Request, Response, Uri, +}; use hyper::{ body::{Body, HttpBody}, client, @@ -16,13 +19,17 @@ use hyper_openssl::HttpsConnector; use hyper_proxy::ProxyConnector; use snafu::{ResultExt, Snafu}; use tower::Service; -use tracing::Instrument; +use tower_http::{ + classify::{ServerErrorsAsFailures, SharedClassifier}, + trace::TraceLayer, +}; +use tracing::{Instrument, Span}; use vector_lib::configurable::configurable_component; use vector_lib::sensitive_string::SensitiveString; use crate::{ config::ProxyConfig, - internal_events::http_client, + internal_events::{http_client, HttpServerRequestReceived, HttpServerResponseSent}, tls::{tls_connector_builder, MaybeTlsSettings, TlsError}, }; @@ -338,6 +345,43 @@ pub fn get_http_scheme_from_uri(uri: &Uri) -> &'static str { }) } +/// Builds a [TraceLayer] configured for a HTTP server. +/// +/// This layer emits HTTP specific telemetry for requests received, responses sent, and handler duration. +pub fn build_http_trace_layer( + span: Span, +) -> TraceLayer< + SharedClassifier, + impl Fn(&Request) -> Span + Clone, + impl Fn(&Request, &Span) + Clone, + impl Fn(&Response, Duration, &Span) + Clone, + (), + (), + (), +> { + TraceLayer::new_for_http() + .make_span_with(move |request: &Request| { + // This is an error span so that the labels are always present for metrics. + error_span!( + parent: &span, + "http-request", + method = %request.method(), + path = %request.uri().path(), + ) + }) + .on_request(Box::new(|_request: &Request, _span: &Span| { + emit!(HttpServerRequestReceived); + })) + .on_response( + |response: &Response, latency: Duration, _span: &Span| { + emit!(HttpServerResponseSent { response, latency }); + }, + ) + .on_failure(()) + .on_body_chunk(()) + .on_eos(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/internal_events/grpc.rs b/src/internal_events/grpc.rs index ee18a1bd225db..ea3fae807df40 100644 --- a/src/internal_events/grpc.rs +++ b/src/internal_events/grpc.rs @@ -1,7 +1,44 @@ -use metrics::counter; +use std::time::Duration; + +use http::response::Response; +use metrics::{counter, histogram}; +use tonic::Code; use vector_lib::internal_event::InternalEvent; use vector_lib::internal_event::{error_stage, error_type}; +const GRPC_STATUS_LABEL: &str = "grpc_status"; + +#[derive(Debug)] +pub struct GrpcServerRequestReceived; + +impl InternalEvent for GrpcServerRequestReceived { + fn emit(self) { + counter!("grpc_server_messages_received_total", 1); + } +} + +#[derive(Debug)] +pub struct GrpcServerResponseSent<'a, B> { + pub response: &'a Response, + pub latency: Duration, +} + +impl<'a, B> InternalEvent for GrpcServerResponseSent<'a, B> { + fn emit(self) { + let grpc_code = self + .response + .headers() + .get("grpc-status") + // The header value is missing on success. + .map_or(tonic::Code::Ok, |v| tonic::Code::from_bytes(v.as_bytes())); + let grpc_code = grpc_code_to_name(grpc_code); + + let labels = &[(GRPC_STATUS_LABEL, grpc_code)]; + counter!("grpc_server_messages_sent_total", 1, labels); + histogram!("grpc_server_handler_duration_seconds", self.latency, labels); + } +} + #[derive(Debug)] pub struct GrpcInvalidCompressionSchemeError<'a> { pub status: &'a tonic::Status, @@ -48,3 +85,25 @@ where ); } } + +const fn grpc_code_to_name(code: Code) -> &'static str { + match code { + Code::Ok => "Ok", + Code::Cancelled => "Cancelled", + Code::Unknown => "Unknown", + Code::InvalidArgument => "InvalidArgument", + Code::DeadlineExceeded => "DeadlineExceeded", + Code::NotFound => "NotFound", + Code::AlreadyExists => "AlreadyExists", + Code::PermissionDenied => "PermissionDenied", + Code::ResourceExhausted => "ResourceExhausted", + Code::FailedPrecondition => "FailedPrecondition", + Code::Aborted => "Aborted", + Code::OutOfRange => "OutOfRange", + Code::Unimplemented => "Unimplemented", + Code::Internal => "Internal", + Code::Unavailable => "Unavailable", + Code::DataLoss => "DataLoss", + Code::Unauthenticated => "Unauthenticated", + } +} diff --git a/src/internal_events/http.rs b/src/internal_events/http.rs index bcef588f533ff..04a059cc5ace2 100644 --- a/src/internal_events/http.rs +++ b/src/internal_events/http.rs @@ -1,5 +1,6 @@ -use std::error::Error; +use std::{error::Error, time::Duration}; +use http::Response; use metrics::{counter, histogram}; use vector_lib::internal_event::InternalEvent; @@ -8,6 +9,34 @@ use vector_lib::{ json_size::JsonSize, }; +const HTTP_STATUS_LABEL: &str = "status"; + +#[derive(Debug)] +pub struct HttpServerRequestReceived; + +impl InternalEvent for HttpServerRequestReceived { + fn emit(self) { + counter!("http_server_requests_received_total", 1); + } +} + +#[derive(Debug)] +pub struct HttpServerResponseSent<'a, B> { + pub response: &'a Response, + pub latency: Duration, +} + +impl<'a, B> InternalEvent for HttpServerResponseSent<'a, B> { + fn emit(self) { + let labels = &[( + HTTP_STATUS_LABEL, + self.response.status().as_u16().to_string(), + )]; + counter!("http_server_responses_sent_total", 1, labels); + histogram!("http_server_handler_duration_seconds", self.latency, labels); + } +} + #[derive(Debug)] pub struct HttpBytesReceived<'a> { pub byte_size: usize, diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 3e5a860a918bb..329d7a1d291a6 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -197,13 +197,6 @@ pub(crate) use self::gcp_pubsub::*; pub(crate) use self::grpc::*; #[cfg(feature = "sources-host_metrics")] pub(crate) use self::host_metrics::*; -#[cfg(any( - feature = "sources-utils-http", - feature = "sources-utils-http-encoding", - feature = "sources-datadog_agent", - feature = "sources-splunk_hec", -))] -pub(crate) use self::http::*; #[cfg(feature = "sources-utils-http-client")] pub(crate) use self::http_client_source::*; #[cfg(feature = "sinks-influxdb")] @@ -268,7 +261,7 @@ pub(crate) use self::websocket::*; pub(crate) use self::windows::*; pub use self::{ adaptive_concurrency::*, batch::*, common::*, conditions::*, encoding_transcode::*, - heartbeat::*, open::*, process::*, socket::*, tcp::*, template::*, udp::*, + heartbeat::*, http::*, open::*, process::*, socket::*, tcp::*, template::*, udp::*, }; // this version won't be needed once all `InternalEvent`s implement `name()` diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index ecbcf6537b0c3..412e3d69b9517 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -19,6 +19,7 @@ use indexmap::{map::Entry, IndexMap}; use serde_with::serde_as; use snafu::Snafu; use stream_cancel::{Trigger, Tripwire}; +use tower::ServiceBuilder; use tracing::{Instrument, Span}; use vector_lib::configurable::configurable_component; use vector_lib::{ @@ -36,7 +37,7 @@ use crate::{ metric::{Metric, MetricData, MetricKind, MetricSeries, MetricValue}, Event, EventStatus, Finalizable, }, - http::Auth, + http::{build_http_trace_layer, Auth}, internal_events::{PrometheusNormalizationError, PrometheusServerRequestComplete}, sinks::{ util::{ @@ -485,19 +486,21 @@ impl PrometheusExporter { let metrics = Arc::clone(&metrics); let handler = handler.clone(); - async move { - Ok::<_, Infallible>(service_fn(move |req| { - span.in_scope(|| { - let response = handler.handle(req, &metrics); + let inner = service_fn(move |req| { + let response = handler.handle(req, &metrics); - emit!(PrometheusServerRequestComplete { - status_code: response.status(), - }); + emit!(PrometheusServerRequestComplete { + status_code: response.status(), + }); - future::ok::<_, Infallible>(response) - }) - })) - } + future::ok::<_, Infallible>(response) + }); + + let service = ServiceBuilder::new() + .layer(build_http_trace_layer(span.clone())) + .service(inner); + + async move { Ok::<_, Infallible>(service) } }); let (trigger, tripwire) = Tripwire::new(); diff --git a/src/sources/aws_kinesis_firehose/mod.rs b/src/sources/aws_kinesis_firehose/mod.rs index b3116c2825b97..7ee3aa58ef18a 100644 --- a/src/sources/aws_kinesis_firehose/mod.rs +++ b/src/sources/aws_kinesis_firehose/mod.rs @@ -1,14 +1,15 @@ -use std::{fmt, net::SocketAddr}; +use std::{convert::Infallible, fmt, net::SocketAddr}; use codecs::decoding::{DeserializerConfig, FramingConfig}; use futures::FutureExt; +use hyper::{service::make_service_fn, Server}; use lookup::owned_value_path; +use tower::ServiceBuilder; use tracing::Span; use vector_lib::config::{LegacyKey, LogNamespace}; use vector_lib::configurable::configurable_component; use vector_lib::sensitive_string::SensitiveString; use vrl::value::Kind; -use warp::Filter; use crate::{ codecs::DecodingConfig, @@ -16,6 +17,7 @@ use crate::{ GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput, }, + http::build_http_trace_layer, serde::{bool_or_struct, default_decoding, default_framing_message_based}, tls::{MaybeTlsSettings, TlsEnableableConfig}, }; @@ -175,12 +177,21 @@ impl SourceConfig for AwsKinesisFirehoseConfig { let shutdown = cx.shutdown; Ok(Box::pin(async move { let span = Span::current(); - warp::serve(svc.with(warp::trace(move |_info| span.clone()))) - .serve_incoming_with_graceful_shutdown( - listener.accept_stream(), - shutdown.map(|_| ()), - ) - .await; + let make_svc = make_service_fn(move |_conn| { + let svc = ServiceBuilder::new() + .layer(build_http_trace_layer(span.clone())) + .service(warp::service(svc.clone())); + futures_util::future::ok::<_, Infallible>(svc) + }); + + Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + .serve(make_svc) + .with_graceful_shutdown(shutdown.map(|_| ())) + .await + .map_err(|err| { + error!("An error occurred: {:?}.", err); + })?; + Ok(()) })) } diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index 76686629276d1..6be3f15918470 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -17,6 +17,7 @@ pub(crate) mod ddtrace_proto { include!(concat!(env!("OUT_DIR"), "/dd_trace.rs")); } +use std::convert::Infallible; use std::{fmt::Debug, io::Read, net::SocketAddr, sync::Arc}; use bytes::{Buf, Bytes}; @@ -25,10 +26,13 @@ use codecs::decoding::{DeserializerConfig, FramingConfig}; use flate2::read::{MultiGzDecoder, ZlibDecoder}; use futures::FutureExt; use http::StatusCode; +use hyper::service::make_service_fn; +use hyper::Server; use lookup::owned_value_path; use regex::Regex; use serde::{Deserialize, Serialize}; use snafu::Snafu; +use tower::ServiceBuilder; use tracing::Span; use vector_lib::config::{LegacyKey, LogNamespace}; use vector_lib::configurable::configurable_component; @@ -38,6 +42,7 @@ use vrl::path::OwnedTargetPath; use vrl::value::Kind; use warp::{filters::BoxedFilter, reject::Rejection, reply::Response, Filter, Reply}; +use crate::http::build_http_trace_layer; use crate::{ codecs::{Decoder, DecodingConfig}, config::{ @@ -175,25 +180,31 @@ impl SourceConfig for DatadogAgentConfig { info!(message = "Building HTTP server.", address = %self.address); Ok(Box::pin(async move { + let routes = filters.recover(|r: Rejection| async move { + if let Some(e_msg) = r.find::() { + let json = warp::reply::json(e_msg); + Ok(warp::reply::with_status(json, e_msg.status_code())) + } else { + // other internal error - will return 500 internal server error + Err(r) + } + }); + let span = Span::current(); - let routes = filters - .with(warp::trace(move |_info| span.clone())) - .recover(|r: Rejection| async move { - if let Some(e_msg) = r.find::() { - let json = warp::reply::json(e_msg); - Ok(warp::reply::with_status(json, e_msg.status_code())) - } else { - // other internal error - will return 500 internal server error - Err(r) - } - }); - - warp::serve(routes) - .serve_incoming_with_graceful_shutdown( - listener.accept_stream(), - shutdown.map(|_| ()), - ) - .await; + let make_svc = make_service_fn(move |_conn| { + let svc = ServiceBuilder::new() + .layer(build_http_trace_layer(span.clone())) + .service(warp::service(routes.clone())); + futures_util::future::ok::<_, Infallible>(svc) + }); + + Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + .serve(make_svc) + .with_graceful_shutdown(shutdown.map(|_| ())) + .await + .map_err(|err| { + error!("An error occurred: {:?}.", err); + })?; Ok(()) })) diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index 86989f0efd55d..8a2eb0313ca7a 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -1,13 +1,15 @@ -use std::net::SocketAddr; +use std::{convert::Infallible, net::SocketAddr}; use bytes::Bytes; use futures_util::FutureExt; use http::StatusCode; +use hyper::{service::make_service_fn, Server}; use opentelemetry_proto::proto::collector::logs::v1::{ ExportLogsServiceRequest, ExportLogsServiceResponse, }; use prost::Message; use snafu::Snafu; +use tower::ServiceBuilder; use tracing::Span; use vector_lib::internal_event::{ ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered, @@ -21,6 +23,7 @@ use warp::{filters::BoxedFilter, reject::Rejection, reply::Response, Filter, Rep use crate::{ event::Event, + http::build_http_trace_layer, internal_events::{EventsReceived, StreamClosedError}, shutdown::ShutdownSignal, sources::util::{decode, ErrorMessage}, @@ -44,17 +47,23 @@ pub(crate) async fn run_http_server( filters: BoxedFilter<(Response,)>, shutdown: ShutdownSignal, ) -> crate::Result<()> { - let span = Span::current(); let listener = tls_settings.bind(&address).await?; - let routes = filters - .with(warp::trace(move |_info| span.clone())) - .recover(handle_rejection); + let routes = filters.recover(handle_rejection); info!(message = "Building HTTP server.", address = %address); - warp::serve(routes) - .serve_incoming_with_graceful_shutdown(listener.accept_stream(), shutdown.map(|_| ())) - .await; + let span = Span::current(); + let make_svc = make_service_fn(move |_conn| { + let svc = ServiceBuilder::new() + .layer(build_http_trace_layer(span.clone())) + .service(warp::service(routes.clone())); + futures_util::future::ok::<_, Infallible>(svc) + }); + + Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + .serve(make_svc) + .with_graceful_shutdown(shutdown.map(|_| ())) + .await?; Ok(()) } diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 7e666796a8177..2757d4a9d7962 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -1,5 +1,6 @@ use std::{ collections::HashMap, + convert::Infallible, io::Read, net::{Ipv4Addr, SocketAddr}, sync::Arc, @@ -10,11 +11,13 @@ use chrono::{DateTime, TimeZone, Utc}; use flate2::read::MultiGzDecoder; use futures::FutureExt; use http::StatusCode; +use hyper::{service::make_service_fn, Server}; use lookup::lookup_v2::OptionalValuePath; use lookup::{event_path, owned_value_path}; use serde::Serialize; use serde_json::{de::Read as JsonRead, Deserializer, Value as JsonValue}; use snafu::Snafu; +use tower::ServiceBuilder; use tracing::Span; use vector_lib::configurable::configurable_component; use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _, Registered}; @@ -38,6 +41,7 @@ use self::{ use crate::{ config::{log_schema, DataType, Resource, SourceConfig, SourceContext, SourceOutput}, event::{Event, LogEvent, Value}, + http::build_http_trace_layer, internal_events::{ EventsReceived, HttpBytesReceived, SplunkHecRequestBodyInvalidError, SplunkHecRequestError, SplunkHecRequestReceived, @@ -166,12 +170,20 @@ impl SourceConfig for SplunkConfig { Ok(Box::pin(async move { let span = Span::current(); - warp::serve(services.with(warp::trace(move |_info| span.clone()))) - .serve_incoming_with_graceful_shutdown( - listener.accept_stream(), - shutdown.map(|_| ()), - ) - .await; + let make_svc = make_service_fn(move |_conn| { + let svc = ServiceBuilder::new() + .layer(build_http_trace_layer(span.clone())) + .service(warp::service(services.clone())); + futures_util::future::ok::<_, Infallible>(svc) + }); + + Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + .serve(make_svc) + .with_graceful_shutdown(shutdown.map(|_| ())) + .await + .map_err(|err| { + error!("An error occurred: {:?}.", err); + })?; Ok(()) })) diff --git a/src/sources/util/grpc/mod.rs b/src/sources/util/grpc/mod.rs index 68c54c5d2a0d6..c1976a4ac8fb5 100644 --- a/src/sources/util/grpc/mod.rs +++ b/src/sources/util/grpc/mod.rs @@ -1,17 +1,22 @@ use crate::{ + internal_events::{GrpcServerRequestReceived, GrpcServerResponseSent}, shutdown::{ShutdownSignal, ShutdownSignalToken}, tls::MaybeTlsSettings, }; use futures::FutureExt; use http::{Request, Response}; use hyper::Body; -use std::{convert::Infallible, net::SocketAddr}; +use std::{convert::Infallible, net::SocketAddr, time::Duration}; use tonic::{ body::BoxBody, transport::server::{NamedService, Server}, }; use tower::Service; -use tracing::{Instrument, Span}; +use tower_http::{ + classify::{GrpcErrorsAsFailures, SharedClassifier}, + trace::TraceLayer, +}; +use tracing::Span; mod decompression; pub use self::decompression::{DecompressionAndMetrics, DecompressionAndMetricsLayer}; @@ -38,7 +43,7 @@ where info!(%address, "Building gRPC server."); Server::builder() - .trace_fn(move |_| span.clone()) + .layer(build_grpc_trace_layer(span.clone())) // This layer explicitly decompresses payloads, if compressed, and reports the number of message bytes we've // received if the message is processed successfully, aka `BytesReceived`. We do this because otherwise the only // access we have is either the event-specific bytes (the in-memory representation) or the raw bytes over the @@ -51,10 +56,51 @@ where .layer(DecompressionAndMetricsLayer) .add_service(service) .serve_with_incoming_shutdown(stream, shutdown.map(|token| tx.send(token).unwrap())) - .in_current_span() .await?; drop(rx.await); Ok(()) } + +/// Builds a [TraceLayer] configured for a gRPC server. +/// +/// This layer emits gPRC specific telemetry for messages received/sent and handler duration. +pub fn build_grpc_trace_layer( + span: Span, +) -> TraceLayer< + SharedClassifier, + impl Fn(&Request) -> Span + Clone, + impl Fn(&Request, &Span) + Clone, + impl Fn(&Response, Duration, &Span) + Clone, + (), + (), + (), +> { + TraceLayer::new_for_grpc() + .make_span_with(move |request: &Request| { + // The path is defined as “/” {service name} “/” {method name}. + let mut path = request.uri().path().split('/'); + let service = path.nth(1).unwrap_or("_unknown"); + let method = path.next().unwrap_or("_unknown"); + + // This is an error span so that the labels are always present for metrics. + error_span!( + parent: &span, + "grpc-request", + grpc_service = service, + grpc_method = method, + ) + }) + .on_request(Box::new(|_request: &Request, _span: &Span| { + emit!(GrpcServerRequestReceived); + })) + .on_response( + |response: &Response, latency: Duration, _span: &Span| { + emit!(GrpcServerResponseSent { response, latency }); + }, + ) + .on_failure(()) + .on_body_chunk(()) + .on_eos(()) +} diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index 42482ef00aea4..1523427f392a4 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -1,13 +1,20 @@ -use std::{collections::HashMap, convert::TryFrom, fmt, net::SocketAddr}; -use vector_lib::EstimatedJsonEncodedSizeOf; +use std::{ + collections::HashMap, + convert::{Infallible, TryFrom}, + fmt, + net::SocketAddr, +}; use async_trait::async_trait; use bytes::Bytes; use futures::{FutureExt, TryFutureExt}; +use hyper::{service::make_service_fn, Server}; +use tower::ServiceBuilder; use tracing::Span; use vector_lib::{ config::SourceAcknowledgementsConfig, event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event}, + EstimatedJsonEncodedSizeOf, }; use warp::{ filters::{ @@ -21,6 +28,7 @@ use warp::{ use crate::{ config::SourceContext, + http::build_http_trace_layer, internal_events::{ HttpBadRequest, HttpBytesReceived, HttpEventsReceived, HttpInternalError, StreamClosedError, }, @@ -76,7 +84,6 @@ pub trait HttpSource: Clone + Send + Sync + 'static { let path = path.to_owned(); let acknowledgements = cx.do_acknowledgements(acknowledgements); Ok(Box::pin(async move { - let span = Span::current(); let mut filter: BoxedFilter<()> = match method { HttpMethod::Head => warp::head().boxed(), HttpMethod::Get => warp::get().boxed(), @@ -155,8 +162,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static { handle_request(events, acknowledgements, response_code, cx.out.clone()) }, - ) - .with(warp::trace(move |_info| span.clone())); + ); let ping = warp::get().and(warp::path("ping")).map(|| "pong"); let routes = svc.or(ping).recover(|r: Rejection| async move { @@ -172,22 +178,28 @@ pub trait HttpSource: Clone + Send + Sync + 'static { } }); + let span = Span::current(); + let make_svc = make_service_fn(move |_conn| { + let svc = ServiceBuilder::new() + .layer(build_http_trace_layer(span.clone())) + .service(warp::service(routes.clone())); + futures_util::future::ok::<_, Infallible>(svc) + }); + info!(message = "Building HTTP server.", address = %address); - match tls.bind(&address).await { - Ok(listener) => { - warp::serve(routes) - .serve_incoming_with_graceful_shutdown( - listener.accept_stream(), - cx.shutdown.map(|_| ()), - ) - .await; - } - Err(error) => { - error!("An error occurred: {:?}.", error); - return Err(()); - } - } + let listener = tls.bind(&address).await.map_err(|err| { + error!("An error occurred: {:?}.", err); + })?; + + Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + .serve(make_svc) + .with_graceful_shutdown(cx.shutdown.map(|_| ())) + .await + .map_err(|err| { + error!("An error occurred: {:?}.", err); + })?; + Ok(()) })) } diff --git a/website/cue/reference/components/sinks/prometheus_exporter.cue b/website/cue/reference/components/sinks/prometheus_exporter.cue index d4b79f88f23c3..7e7916ae2c47b 100644 --- a/website/cue/reference/components/sinks/prometheus_exporter.cue +++ b/website/cue/reference/components/sinks/prometheus_exporter.cue @@ -314,4 +314,10 @@ components: sinks: prometheus_exporter: { """ } } + + telemetry: metrics: { + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total + } } diff --git a/website/cue/reference/components/sources/aws_kinesis_firehose.cue b/website/cue/reference/components/sources/aws_kinesis_firehose.cue index 22adc0ecb28e4..32c446c847887 100644 --- a/website/cue/reference/components/sources/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sources/aws_kinesis_firehose.cue @@ -187,6 +187,9 @@ components: sources: aws_kinesis_firehose: { } telemetry: metrics: { + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total request_read_errors_total: components.sources.internal_metrics.output.metrics.request_read_errors_total requests_received_total: components.sources.internal_metrics.output.metrics.requests_received_total request_automatic_decode_errors_total: components.sources.internal_metrics.output.metrics.request_automatic_decode_errors_total diff --git a/website/cue/reference/components/sources/datadog_agent.cue b/website/cue/reference/components/sources/datadog_agent.cue index 88e5cd9cd429f..9c6b220af87a8 100644 --- a/website/cue/reference/components/sources/datadog_agent.cue +++ b/website/cue/reference/components/sources/datadog_agent.cue @@ -217,4 +217,10 @@ components: sources: datadog_agent: { """ } } + + telemetry: metrics: { + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total + } } diff --git a/website/cue/reference/components/sources/heroku_logs.cue b/website/cue/reference/components/sources/heroku_logs.cue index 038fe51b8164b..5f049a08193d5 100644 --- a/website/cue/reference/components/sources/heroku_logs.cue +++ b/website/cue/reference/components/sources/heroku_logs.cue @@ -101,7 +101,10 @@ components: sources: heroku_logs: { } telemetry: metrics: { - request_read_errors_total: components.sources.internal_metrics.output.metrics.request_read_errors_total - requests_received_total: components.sources.internal_metrics.output.metrics.requests_received_total + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total + request_read_errors_total: components.sources.internal_metrics.output.metrics.request_read_errors_total + requests_received_total: components.sources.internal_metrics.output.metrics.requests_received_total } } diff --git a/website/cue/reference/components/sources/http_server.cue b/website/cue/reference/components/sources/http_server.cue index 87055622b5ec7..36edbaece2683 100644 --- a/website/cue/reference/components/sources/http_server.cue +++ b/website/cue/reference/components/sources/http_server.cue @@ -178,8 +178,11 @@ components: sources: http_server: { ] telemetry: metrics: { - http_bad_requests_total: components.sources.internal_metrics.output.metrics.http_bad_requests_total - parse_errors_total: components.sources.internal_metrics.output.metrics.parse_errors_total + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total + http_bad_requests_total: components.sources.internal_metrics.output.metrics.http_bad_requests_total + parse_errors_total: components.sources.internal_metrics.output.metrics.parse_errors_total } how_it_works: { diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index 8a0bc527cddc3..920ca9d77ae93 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -721,6 +721,35 @@ components: sources: internal_metrics: { path: _path } } + grpc_server_messages_received_total: { + description: "The total number of gRPC messages received." + type: "counter" + default_namespace: "vector" + tags: _component_tags & { + grpc_method: _grpc_method + grpc_service: _grpc_service + } + } + grpc_server_messages_sent_total: { + description: "The total number of gRPC messages sent." + type: "counter" + default_namespace: "vector" + tags: _component_tags & { + grpc_method: _grpc_method + grpc_service: _grpc_service + grpc_status: _grpc_status + } + } + grpc_server_handler_duration_seconds: { + description: "The duration spent handling a gRPC request." + type: "histogram" + default_namespace: "vector" + tags: _component_tags & { + grpc_method: _grpc_method + grpc_service: _grpc_service + grpc_status: _grpc_status + } + } http_bad_requests_total: { description: "The total number of HTTP `400 Bad Request` errors encountered." type: "counter" @@ -775,6 +804,35 @@ components: sources: internal_metrics: { default_namespace: "vector" tags: _component_tags } + http_server_requests_received_total: { + description: "The total number of HTTP requests received." + type: "counter" + default_namespace: "vector" + tags: _component_tags & { + method: _method + path: _path + } + } + http_server_responses_sent_total: { + description: "The total number of HTTP responses sent." + type: "counter" + default_namespace: "vector" + tags: _component_tags & { + method: _method + path: _path + status: _status + } + } + http_server_handler_duration_seconds: { + description: "The duration spent handling a HTTP request." + type: "histogram" + default_namespace: "vector" + tags: _component_tags & { + method: _method + path: _path + status: _status + } + } invalid_record_total: { description: "The total number of invalid records that have been discarded." type: "counter" @@ -1159,6 +1217,18 @@ components: sources: internal_metrics: { description: "The file that produced the error" required: false } + _grpc_method: { + description: "The name of the method called on the gRPC service." + required: true + } + _grpc_service: { + description: "The gRPC service name." + required: true + } + _grpc_status: { + description: "The human-readable [gRPC status code](\(urls.grpc_status_code))." + required: true + } _host: { description: "The hostname of the originating system." required: true diff --git a/website/cue/reference/components/sources/opentelemetry.cue b/website/cue/reference/components/sources/opentelemetry.cue index d86d43faf5ad2..415039485ced6 100644 --- a/website/cue/reference/components/sources/opentelemetry.cue +++ b/website/cue/reference/components/sources/opentelemetry.cue @@ -205,4 +205,13 @@ components: sources: opentelemetry: { """ } } + + telemetry: metrics: { + grpc_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.grpc_server_handler_duration_seconds + grpc_server_messages_received_total: components.sources.internal_metrics.output.metrics.grpc_server_messages_received_total + grpc_server_messages_sent_total: components.sources.internal_metrics.output.metrics.grpc_server_messages_sent_total + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total + } } diff --git a/website/cue/reference/components/sources/prometheus_remote_write.cue b/website/cue/reference/components/sources/prometheus_remote_write.cue index ce004a3595c95..9c6362e6ecf6e 100644 --- a/website/cue/reference/components/sources/prometheus_remote_write.cue +++ b/website/cue/reference/components/sources/prometheus_remote_write.cue @@ -84,9 +84,12 @@ components: sources: prometheus_remote_write: { } telemetry: metrics: { - parse_errors_total: components.sources.internal_metrics.output.metrics.parse_errors_total - requests_completed_total: components.sources.internal_metrics.output.metrics.requests_completed_total - requests_received_total: components.sources.internal_metrics.output.metrics.requests_received_total - request_duration_seconds: components.sources.internal_metrics.output.metrics.request_duration_seconds + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total + parse_errors_total: components.sources.internal_metrics.output.metrics.parse_errors_total + requests_completed_total: components.sources.internal_metrics.output.metrics.requests_completed_total + requests_received_total: components.sources.internal_metrics.output.metrics.requests_received_total + request_duration_seconds: components.sources.internal_metrics.output.metrics.request_duration_seconds } } diff --git a/website/cue/reference/components/sources/splunk_hec.cue b/website/cue/reference/components/sources/splunk_hec.cue index c1334cb3792da..5671387a87fbd 100644 --- a/website/cue/reference/components/sources/splunk_hec.cue +++ b/website/cue/reference/components/sources/splunk_hec.cue @@ -79,8 +79,11 @@ components: sources: splunk_hec: { } telemetry: metrics: { - http_request_errors_total: components.sources.internal_metrics.output.metrics.http_request_errors_total - requests_received_total: components.sources.internal_metrics.output.metrics.requests_received_total + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total + http_request_errors_total: components.sources.internal_metrics.output.metrics.http_request_errors_total + requests_received_total: components.sources.internal_metrics.output.metrics.requests_received_total } how_it_works: { diff --git a/website/cue/reference/components/sources/vector.cue b/website/cue/reference/components/sources/vector.cue index 87737a272480f..bccc77d956b66 100644 --- a/website/cue/reference/components/sources/vector.cue +++ b/website/cue/reference/components/sources/vector.cue @@ -100,6 +100,9 @@ components: sources: vector: { } telemetry: metrics: { - protobuf_decode_errors_total: components.sources.internal_metrics.output.metrics.protobuf_decode_errors_total + grpc_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.grpc_server_handler_duration_seconds + grpc_server_messages_received_total: components.sources.internal_metrics.output.metrics.grpc_server_messages_received_total + grpc_server_messages_sent_total: components.sources.internal_metrics.output.metrics.grpc_server_messages_sent_total + protobuf_decode_errors_total: components.sources.internal_metrics.output.metrics.protobuf_decode_errors_total } } diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index bba6a6d352126..eced20ef2fb51 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -239,6 +239,7 @@ urls: { greptimecloud: "https://greptime.cloud" greptimedb: "https://github.com/greptimeteam/greptimedb" greptimedb_grpc: "https://docs.greptime.com/" + grpc_status_code: "https://grpc.github.io/grpc/core/md_doc_statuscodes.html" grok: "https://github.com/daschl/grok/tree/master/patterns" grok_debugger: "https://grokdebug.herokuapp.com/" grok_patterns: "\(github)/daschl/grok/tree/master/patterns"