Skip to content

Commit

Permalink
Pull Admin module out of Metrics
Browse files Browse the repository at this point in the history
This pulls the `hyper` server out of the `Metrics` module and move it
into its own `Admin` module that handles metrics, and in the future, the
health/liveness endpoint as well.

Closes #101
  • Loading branch information
markmandel committed Mar 23, 2021
1 parent 35975f1 commit a99ddf5
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 159 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Not to be used in production systems.
* See the [proxy configuration reference](./docs/proxy-configuration.md) for all the configuration options.
* See the [Session documentation](./docs/session.md) for an overview of quilkin sessions and metrics.
* See [Filter documentation](./docs/extensions/filters/filters.md) for a list of filters, and their configuration options.
* The [Administration interface](./docs/admin.md) provides access to health and metrics endpoints.

## Code of Conduct

Expand Down
21 changes: 21 additions & 0 deletions docs/admin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Administration Interface

Quilkin exposes an HTTP interface to query different aspects of the server.

> It is assumed that the administration interface will only ever be able to be accessible on `localhost`.
By default, the administration interface is bound to `[::]:9091`, but it can be configured through the
[proxy configuration file](./proxy-configuration.md), like so:

```yaml
admin:
address: [::]:9095
```
The admin interface provides the following endpoints:
## /metrics
Outputs [Prometheus](https://prometheus.io/) formatted metrics for this proxy.
See the [Proxy Metrics](./proxy.md#metrics) documentation for what metrics are available.
4 changes: 4 additions & 0 deletions src/config/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ impl Builder {
Builder { source, ..self }
}

pub fn with_admin(self, admin: Admin) -> Self {
Self { admin, ..self }
}

pub fn build(self) -> Config {
Config {
version: Version::V1Alpha1,
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
mod cluster;
pub mod config;
pub mod extensions;
pub mod metrics;
pub(crate) mod metrics;
pub mod proxy;
pub mod runner;
pub mod test_utils;
Expand Down
83 changes: 83 additions & 0 deletions src/proxy/admin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2021 Google LLC All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server as HyperServer, StatusCode};
use slog::{error, info, o, Logger};
use tokio::sync::watch;

use crate::proxy::Metrics;

pub struct Admin {
log: Logger,
/// The address that the Admin server starts on
addr: SocketAddr,
metrics: Arc<Metrics>,
}

impl Admin {
pub fn new(base: &Logger, addr: SocketAddr, metrics: Arc<Metrics>) -> Self {
Admin {
log: base.new(o!("source" => "proxy::Admin")),
addr,
metrics,
}
}

pub fn run(&self, mut shutdown_rx: watch::Receiver<()>) {
info!(self.log, "Starting admin endpoint"; "address" => self.addr.to_string());

let metrics = self.metrics.clone();
let make_svc = make_service_fn(move |_conn| {
let metrics = metrics.clone();
async move {
let metrics = metrics.clone();
Ok::<_, Infallible>(service_fn(move |req| {
let metrics = metrics.clone();
async move { Ok::<_, Infallible>(handle_request(req, metrics)) }
}))
}
});

let server = HyperServer::bind(&self.addr)
.serve(make_svc)
.with_graceful_shutdown(async move {
shutdown_rx.changed().await.ok();
});

let log = self.log.clone();
tokio::spawn(async move {
if let Err(err) = server.await {
error!(log, "Admin server exited with an error"; "error" => %err);
}
});
}
}

fn handle_request(request: Request<Body>, metrics: Arc<Metrics>) -> Response<Body> {
match (request.method(), request.uri().path()) {
(&Method::GET, "/metrics") => metrics.admin_response(),
(_, _) => {
let mut response = Response::new(Body::empty());
*response.status_mut() = StatusCode::NOT_FOUND;
response
}
}
}
21 changes: 16 additions & 5 deletions src/proxy/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use crate::config::{
};
use crate::extensions::{default_registry, CreateFilterError, FilterChain, FilterRegistry};
use crate::proxy::server::metrics::Metrics as ProxyMetrics;
use crate::proxy::{Metrics, Server};
use crate::proxy::{Admin as ProxyAdmin, Metrics, Server};
use prometheus::Registry;
use slog::{o, Drain, Logger};
use std::collections::HashSet;
use std::convert::TryInto;
Expand Down Expand Up @@ -108,17 +109,21 @@ pub struct Builder<V> {
log: Logger,
config: Arc<Config>,
filter_registry: FilterRegistry,
metrics: Metrics,
admin: Option<ProxyAdmin>,
metrics: Arc<Metrics>,
validation_status: V,
}

impl From<Arc<Config>> for Builder<PendingValidation> {
fn from(config: Arc<Config>) -> Self {
let log = logger();
let metrics = Arc::new(Metrics::new(&log, Registry::default()));
let admin = ProxyAdmin::new(&log, config.admin.address, metrics.clone());
Builder {
config,
filter_registry: default_registry(&log),
metrics: Metrics::default(),
admin: Some(admin),
metrics,
log,
validation_status: PendingValidation,
}
Expand Down Expand Up @@ -249,8 +254,12 @@ impl Builder<PendingValidation> {
}
}

pub fn with_metrics(self, metrics: Metrics) -> Self {
Self { metrics, ..self }
/// Disable the admin interface
pub fn with_disabled_admin(self) -> Self {
Self {
admin: None,
..self
}
}

// Validates the builder's config and filter configurations.
Expand All @@ -261,6 +270,7 @@ impl Builder<PendingValidation> {
Ok(Builder {
log: self.log,
config: self.config,
admin: self.admin,
metrics: self.metrics,
filter_registry: self.filter_registry,
validation_status: Validated(validated_config),
Expand All @@ -275,6 +285,7 @@ impl Builder<Validated> {
config: Arc::new(self.validation_status.0),
proxy_metrics: ProxyMetrics::new(&self.metrics.registry.clone())
.expect("metrics should be setup properly"),
admin: self.admin,
metrics: self.metrics,
filter_registry: Arc::new(self.filter_registry),
}
Expand Down
150 changes: 54 additions & 96 deletions src/proxy/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,111 +14,27 @@
* limitations under the License.
*/

use crate::proxy::sessions::metrics::Metrics as SessionMetrics;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Response, Server as HyperServer, StatusCode};
use prometheus::{Encoder, Registry, Result as MetricsResult, TextEncoder};
use slog::{error, info, warn, Logger};
use std::convert::Infallible;
use std::net::SocketAddr;
use tokio::sync::watch::Receiver;

use hyper::{Body, Response, StatusCode};
use prometheus::{Encoder, Registry, Result as MetricsResult, TextEncoder};
use slog::{o, warn, Logger};

use crate::proxy::sessions::metrics::Metrics as SessionMetrics;

/// Metrics contains metrics configuration for the server.
#[derive(Clone)]
pub struct Metrics {
/// addr is the socket address on which the server exposes metrics.
/// If none is provided the server does not expose any metrics.
pub addr: Option<SocketAddr>,
pub registry: Registry,
}

/// start_metrics_server starts a HTTP server in the background at `addr` which
/// serves prometheus metrics from `registry`. The server is bounded by `shutdown_signal`,
pub fn start_metrics_server(
addr: SocketAddr,
registry: Registry,
mut shutdown_rx: Receiver<()>,
log: Logger,
) {
info!(log, "Starting metrics"; "address" => %addr);

let handler_log = log.clone();
let make_svc = make_service_fn(move |_conn| {
let registry = registry.clone();
let handler_log = handler_log.clone();
async move {
let registry = registry.clone();
let handler_log = handler_log.clone();
Ok::<_, Infallible>(service_fn(move |req| {
let registry = registry.clone();
let handler_log = handler_log.clone();
async move {
Ok::<_, Infallible>(handle_request(
handler_log,
req.method(),
req.uri().path(),
registry,
))
}
}))
}
});

let server = HyperServer::bind(&addr)
.serve(make_svc)
.with_graceful_shutdown(async move {
shutdown_rx.changed().await.ok();
});

tokio::spawn(async move {
if let Err(err) = server.await {
error!(log, "Metrics server exited with an error"; "error" => %err);
}
});
}

fn handle_request(log: Logger, method: &Method, path: &str, registry: Registry) -> Response<Body> {
let mut response = Response::new(Body::empty());

match (method, path) {
(&Method::GET, "/metrics") => {
let mut buffer = vec![];
let encoder = TextEncoder::new();
let body = encoder
.encode(&registry.gather(), &mut buffer)
.map_err(|err| warn!(log, "Failed to encode metrics"; "error" => %err))
.and_then(|_| {
String::from_utf8(buffer).map(Body::from).map_err(
|err| warn!(log, "Failed to convert metrics to utf8"; "error" => %err),
)
});

match body {
Ok(body) => {
*response.body_mut() = body;
}
Err(_) => {
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
}
}
}
_ => {
*response.status_mut() = StatusCode::NOT_FOUND;
}
};

response
}

impl Default for Metrics {
fn default() -> Self {
Metrics::new(None, Registry::default())
}
pub(crate) registry: Registry,
}

impl Metrics {
pub fn new(addr: Option<SocketAddr>, registry: Registry) -> Self {
Metrics { addr, registry }
pub fn new(base: &Logger, registry: Registry) -> Self {
Metrics {
log: base.new(o!("source" => "proxy::Metrics")),
registry,
}
}

pub fn new_session_metrics(
Expand All @@ -132,4 +48,46 @@ impl Metrics {
upstream.to_string(),
)
}

pub fn admin_response(&self) -> Response<Body> {
let mut response = Response::new(Body::empty());
let mut buffer = vec![];
let encoder = TextEncoder::new();
let body = encoder
.encode(&self.registry.gather(), &mut buffer)
.map_err(|err| warn!(self.log, "Failed to encode metrics"; "error" => %err))
.and_then(|_| {
String::from_utf8(buffer).map(Body::from).map_err(
|err| warn!(self.log, "Failed to convert metrics to utf8"; "error" => %err),
)
});

match body {
Ok(body) => {
*response.body_mut() = body;
}
Err(_) => {
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
}
};

response
}
}

#[cfg(test)]
mod tests {
use hyper::StatusCode;
use prometheus::Registry;

use crate::proxy::Metrics;
use crate::test_utils::logger;

#[tokio::test]
async fn admin_response() {
let log = logger();
let metrics = Metrics::new(&log, Registry::default());
let response = metrics.admin_response();
assert_eq!(response.status(), StatusCode::OK);
}
}
2 changes: 2 additions & 0 deletions src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
* limitations under the License.
*/

pub use admin::Admin;
pub use builder::{logger, Builder};
pub use metrics::Metrics;
pub use server::Server;

mod admin;
mod builder;
mod metrics;
mod server;
Expand Down
Loading

0 comments on commit a99ddf5

Please sign in to comment.