Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(sources, sinks): add telemetry to http and grpc servers #18887

Merged
merged 11 commits into from
Oct 27, 2023
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ opendal = {version = "0.38", default-features = false, features = ["native-tls",

# Tower
tower = { version = "0.4.13", default-features = false, features = ["buffer", "limit", "retry", "timeout", "util", "balance", "discover"] }
tower-http = { version = "0.4.4", default-features = false, features = ["decompression-gzip"]}
tower-http = { version = "0.4.4", default-features = false, features = ["decompression-gzip", "trace"]}
# Serde
serde = { version = "1.0.190", default-features = false, features = ["derive"] }
serde-toml-merge = { version = "0.3.3", default-features = false }
Expand Down
25 changes: 19 additions & 6 deletions lib/vector-core/src/metrics/label_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,24 @@ use metrics_tracing_context::LabelFilter;
pub(crate) struct VectorLabelFilter;

impl LabelFilter for VectorLabelFilter {
fn should_include_label(&self, _key: &KeyName, label: &Label) -> bool {
let key = label.key();
key == "component_id"
|| key == "component_type"
|| key == "component_kind"
|| key == "buffer_type"
fn should_include_label(&self, metric_key: &KeyName, label: &Label) -> bool {
let label_key = label.key();
// HTTP Server-specific labels
if metric_key.as_str().starts_with("http_server_")
&& (label_key == "method" || label_key == "path")
{
return true;
}
// gRPC Server-specific labels
if metric_key.as_str().starts_with("grpc_server_")
&& (label_key == "grpc_method" || label_key == "grpc_service")
{
return true;
}
// Global labels
label_key == "component_id"
|| label_key == "component_type"
|| label_key == "component_kind"
|| label_key == "buffer_type"
}
}
45 changes: 32 additions & 13 deletions src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ use async_graphql::{
Data, Request, Schema,
};
use async_graphql_warp::{graphql_protocol, GraphQLResponse, GraphQLWebSocket};
use hyper::{server::conn::AddrIncoming, service::make_service_fn, Server as HyperServer};
use tokio::runtime::Handle;
use tokio::sync::oneshot;
use tower::ServiceBuilder;
use tracing::Span;
use warp::{filters::BoxedFilter, http::Response, ws::Ws, Filter, Reply};

use super::{handler, schema, ShutdownTx};
use crate::{
config,
http::build_http_trace_layer,
internal_events::{SocketBindError, SocketMode},
topology,
};
Expand All @@ -39,20 +43,35 @@ impl Server {
let (_shutdown, rx) = oneshot::channel();
// warp uses `tokio::spawn` and so needs us to enter the runtime context.
let _guard = handle.enter();
let (addr, server) = warp::serve(routes)
.try_bind_with_graceful_shutdown(
config.api.address.expect("No socket address"),
async {

let addr = config.api.address.expect("No socket address");
let incoming = AddrIncoming::bind(&addr).map_err(|error| {
emit!(SocketBindError {
mode: SocketMode::Tcp,
error: &error,
});
error
})?;

let span = Span::current();
let make_svc = make_service_fn(move |_conn| {
let svc = ServiceBuilder::new()
.layer(build_http_trace_layer(span.clone()))
.service(warp::service(routes.clone()));
futures_util::future::ok::<_, Infallible>(svc)
});

let server = async move {
HyperServer::builder(incoming)
.serve(make_svc)
.with_graceful_shutdown(async {
rx.await.ok();
},
)
.map_err(|error| {
emit!(SocketBindError {
mode: SocketMode::Tcp,
error: &error,
});
error
})?;
})
.await
.map_err(|err| {
error!("An error occurred: {:?}.", err);
})
};

// Update component schema with the config before starting the server.
schema::components::update_config(config);
Expand Down
50 changes: 47 additions & 3 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
use std::{
fmt,
task::{Context, Poll},
time::Duration,
};

use futures::future::BoxFuture;
use headers::{Authorization, HeaderMapExt};
use http::{header::HeaderValue, request::Builder, uri::InvalidUri, HeaderMap, Request, Uri};
use http::{
header::HeaderValue, request::Builder, uri::InvalidUri, HeaderMap, Request, Response, Uri,
};
use hyper::{
body::{Body, HttpBody},
client,
Expand All @@ -16,13 +19,17 @@ use hyper_openssl::HttpsConnector;
use hyper_proxy::ProxyConnector;
use snafu::{ResultExt, Snafu};
use tower::Service;
use tracing::Instrument;
use tower_http::{
classify::{ServerErrorsAsFailures, SharedClassifier},
trace::TraceLayer,
};
use tracing::{Instrument, Span};
use vector_lib::configurable::configurable_component;
use vector_lib::sensitive_string::SensitiveString;

use crate::{
config::ProxyConfig,
internal_events::http_client,
internal_events::{http_client, HttpServerRequestReceived, HttpServerResponseSent},
tls::{tls_connector_builder, MaybeTlsSettings, TlsError},
};

Expand Down Expand Up @@ -338,6 +345,43 @@ pub fn get_http_scheme_from_uri(uri: &Uri) -> &'static str {
})
}

/// Builds a [TraceLayer] configured for a HTTP server.
dsmith3197 marked this conversation as resolved.
Show resolved Hide resolved
///
/// This layer emits HTTP specific telemetry for requests received, responses sent, and handler duration.
pub fn build_http_trace_layer(
span: Span,
) -> TraceLayer<
SharedClassifier<ServerErrorsAsFailures>,
impl Fn(&Request<Body>) -> Span + Clone,
impl Fn(&Request<Body>, &Span) + Clone,
impl Fn(&Response<Body>, Duration, &Span) + Clone,
(),
(),
(),
> {
TraceLayer::new_for_http()
.make_span_with(move |request: &Request<Body>| {
// This is an error span so that the labels are always present for metrics.
error_span!(
parent: &span,
"http-request",
method = %request.method(),
path = %request.uri().path(),
)
})
.on_request(Box::new(|_request: &Request<Body>, _span: &Span| {
emit!(HttpServerRequestReceived);
}))
.on_response(
|response: &Response<Body>, latency: Duration, _span: &Span| {
emit!(HttpServerResponseSent { response, latency });
},
)
.on_failure(())
.on_body_chunk(())
.on_eos(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
61 changes: 60 additions & 1 deletion src/internal_events/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,44 @@
use metrics::counter;
use std::time::Duration;

use http::response::Response;
use metrics::{counter, histogram};
use tonic::Code;
use vector_lib::internal_event::InternalEvent;
use vector_lib::internal_event::{error_stage, error_type};

const GRPC_STATUS_LABEL: &str = "grpc_status";

#[derive(Debug)]
pub struct GrpcServerRequestReceived;

impl InternalEvent for GrpcServerRequestReceived {
fn emit(self) {
counter!("grpc_server_messages_received_total", 1);
}
}

#[derive(Debug)]
pub struct GrpcServerResponseSent<'a, B> {
pub response: &'a Response<B>,
pub latency: Duration,
}

impl<'a, B> InternalEvent for GrpcServerResponseSent<'a, B> {
fn emit(self) {
let grpc_code = self
.response
.headers()
.get("grpc-status")
// The header value is missing on success.
.map_or(tonic::Code::Ok, |v| tonic::Code::from_bytes(v.as_bytes()));
let grpc_code = grpc_code_to_name(grpc_code);

let labels = &[(GRPC_STATUS_LABEL, grpc_code)];
counter!("grpc_server_messages_sent_total", 1, labels);
histogram!("grpc_server_handler_duration_seconds", self.latency, labels);
}
}

#[derive(Debug)]
pub struct GrpcInvalidCompressionSchemeError<'a> {
pub status: &'a tonic::Status,
Expand Down Expand Up @@ -48,3 +85,25 @@ where
);
}
}

const fn grpc_code_to_name(code: Code) -> &'static str {
match code {
Code::Ok => "Ok",
Code::Cancelled => "Cancelled",
Code::Unknown => "Unknown",
Code::InvalidArgument => "InvalidArgument",
Code::DeadlineExceeded => "DeadlineExceeded",
Code::NotFound => "NotFound",
Code::AlreadyExists => "AlreadyExists",
Code::PermissionDenied => "PermissionDenied",
Code::ResourceExhausted => "ResourceExhausted",
Code::FailedPrecondition => "FailedPrecondition",
Code::Aborted => "Aborted",
Code::OutOfRange => "OutOfRange",
Code::Unimplemented => "Unimplemented",
Code::Internal => "Internal",
Code::Unavailable => "Unavailable",
Code::DataLoss => "DataLoss",
Code::Unauthenticated => "Unauthenticated",
}
}
31 changes: 30 additions & 1 deletion src/internal_events/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::error::Error;
use std::{error::Error, time::Duration};

use http::Response;
use metrics::{counter, histogram};
use vector_lib::internal_event::InternalEvent;

Expand All @@ -8,6 +9,34 @@ use vector_lib::{
json_size::JsonSize,
};

const HTTP_STATUS_LABEL: &str = "status";

#[derive(Debug)]
pub struct HttpServerRequestReceived;

impl InternalEvent for HttpServerRequestReceived {
fn emit(self) {
counter!("http_server_requests_received_total", 1);
}
}

#[derive(Debug)]
pub struct HttpServerResponseSent<'a, B> {
pub response: &'a Response<B>,
pub latency: Duration,
}

impl<'a, B> InternalEvent for HttpServerResponseSent<'a, B> {
fn emit(self) {
let labels = &[(
HTTP_STATUS_LABEL,
self.response.status().as_u16().to_string(),
)];
counter!("http_server_responses_sent_total", 1, labels);
histogram!("http_server_handler_duration_seconds", self.latency, labels);
}
}

#[derive(Debug)]
pub struct HttpBytesReceived<'a> {
pub byte_size: usize,
Expand Down
9 changes: 1 addition & 8 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,6 @@ pub(crate) use self::gcp_pubsub::*;
pub(crate) use self::grpc::*;
#[cfg(feature = "sources-host_metrics")]
pub(crate) use self::host_metrics::*;
#[cfg(any(
feature = "sources-utils-http",
feature = "sources-utils-http-encoding",
feature = "sources-datadog_agent",
feature = "sources-splunk_hec",
))]
pub(crate) use self::http::*;
#[cfg(feature = "sources-utils-http-client")]
pub(crate) use self::http_client_source::*;
#[cfg(feature = "sinks-influxdb")]
Expand Down Expand Up @@ -268,7 +261,7 @@ pub(crate) use self::websocket::*;
pub(crate) use self::windows::*;
pub use self::{
adaptive_concurrency::*, batch::*, common::*, conditions::*, encoding_transcode::*,
heartbeat::*, open::*, process::*, socket::*, tcp::*, template::*, udp::*,
heartbeat::*, http::*, open::*, process::*, socket::*, tcp::*, template::*, udp::*,
};

// this version won't be needed once all `InternalEvent`s implement `name()`
Expand Down
27 changes: 15 additions & 12 deletions src/sinks/prometheus/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use indexmap::{map::Entry, IndexMap};
use serde_with::serde_as;
use snafu::Snafu;
use stream_cancel::{Trigger, Tripwire};
use tower::ServiceBuilder;
use tracing::{Instrument, Span};
use vector_lib::configurable::configurable_component;
use vector_lib::{
Expand All @@ -36,7 +37,7 @@ use crate::{
metric::{Metric, MetricData, MetricKind, MetricSeries, MetricValue},
Event, EventStatus, Finalizable,
},
http::Auth,
http::{build_http_trace_layer, Auth},
internal_events::{PrometheusNormalizationError, PrometheusServerRequestComplete},
sinks::{
util::{
Expand Down Expand Up @@ -485,19 +486,21 @@ impl PrometheusExporter {
let metrics = Arc::clone(&metrics);
let handler = handler.clone();

async move {
Ok::<_, Infallible>(service_fn(move |req| {
span.in_scope(|| {
let response = handler.handle(req, &metrics);
let inner = service_fn(move |req| {
let response = handler.handle(req, &metrics);

emit!(PrometheusServerRequestComplete {
status_code: response.status(),
});
emit!(PrometheusServerRequestComplete {
status_code: response.status(),
});

future::ok::<_, Infallible>(response)
})
}))
}
future::ok::<_, Infallible>(response)
});

let service = ServiceBuilder::new()
.layer(build_http_trace_layer(span.clone()))
.service(inner);

async move { Ok::<_, Infallible>(service) }
});

let (trigger, tripwire) = Tripwire::new();
Expand Down
Loading
Loading