Skip to content

Commit

Permalink
add disk metrics endpoint (#1348)
Browse files Browse the repository at this point in the history
* add disk metrics endpoint

* Fix tags, openapi

* Use volume ID as upstairs ID

* Make clippy happy

* Add integration test for metrics collection

* Unprivileged_access added to VerifyEndpoint

* Add limits, fix pagination (hopefully. Tests incoming)

* Add test for limit

* Fix pagination, add tests

* NotFound -> Empty Vec in Nexus

* Fix merge

Co-authored-by: Sean Klein <[email protected]>
  • Loading branch information
iliana and smklein authored Jul 26, 2022
1 parent 948e537 commit e9554ad
Show file tree
Hide file tree
Showing 22 changed files with 1,356 additions and 35 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

123 changes: 117 additions & 6 deletions nexus/src/app/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use crate::authz;
use crate::context::OpContext;
use crate::db;
use crate::db::identity::Asset;
use crate::external_api::params::ResourceMetrics;
use crate::internal_api::params::OximeterInfo;
use dropshot::PaginationParams;
use internal_dns_client::{
multiclient::{ResolveError, Resolver},
names::{ServiceName, SRV},
Expand All @@ -21,6 +23,8 @@ use omicron_common::api::external::PaginationOrder;
use omicron_common::api::internal::nexus;
use omicron_common::backoff;
use oximeter_client::Client as OximeterClient;
use oximeter_db::query::Timestamp;
use oximeter_db::Measurement;
use oximeter_db::TimeseriesSchema;
use oximeter_db::TimeseriesSchemaPaginationParams;
use oximeter_producer::register;
Expand Down Expand Up @@ -212,14 +216,112 @@ impl super::Nexus {
.map_err(|e| Error::internal_error(&e.to_string()))?
.timeseries_schema_list(&pag_params.page, limit)
.await
.map_err(|e| match e {
oximeter_db::Error::DatabaseUnavailable(_) => {
Error::ServiceUnavailable {
internal_message: e.to_string(),
}
.map_err(map_oximeter_err)
}

/// Returns a results from the timeseries DB based on the provided query
/// parameters.
///
/// * `timeseries_name`: The "target:metric" name identifying the metric to
/// be queried.
/// * `criteria`: Any additional parameters to help narrow down the query
/// selection further. These parameters are passed directly to
/// [oximeter::db::Client::select_timeseries_with].
/// * `query_params`: Pagination parameter, identifying which page of
/// results to return.
/// * `limit`: The maximum number of results to return in a paginated
/// request.
pub async fn select_timeseries(
&self,
timeseries_name: &str,
criteria: &[&str],
query_params: PaginationParams<ResourceMetrics, ResourceMetrics>,
limit: NonZeroU32,
) -> Result<dropshot::ResultsPage<Measurement>, Error> {
#[inline]
fn no_results() -> dropshot::ResultsPage<Measurement> {
dropshot::ResultsPage { next_page: None, items: Vec::new() }
}

let (start_time, end_time, query) = match query_params.page {
// Generally, we want the time bounds to be inclusive for the
// start time, and exclusive for the end time...
dropshot::WhichPage::First(query) => (
Timestamp::Inclusive(query.start_time),
Timestamp::Exclusive(query.end_time),
query,
),
// ... but for subsequent pages, we use the "last observed"
// timestamp as the start time. If we used an inclusive bound,
// we'd duplicate the returned measurement. To return each
// measurement exactly once, we make the start time "exclusive"
// on all "next" pages.
dropshot::WhichPage::Next(query) => (
Timestamp::Exclusive(query.start_time),
Timestamp::Exclusive(query.end_time),
query,
),
};
if query.start_time >= query.end_time {
return Ok(no_results());
}

let timeseries_list = self
.timeseries_client
.get()
.await
.map_err(|e| {
Error::internal_error(&format!(
"Cannot access timeseries DB: {}",
e
))
})?
.select_timeseries_with(
timeseries_name,
criteria,
Some(start_time),
Some(end_time),
Some(limit),
)
.await
.or_else(|err| {
// If the timeseries name exists in the API, but not in Clickhouse,
// it might just not have been populated yet.
match err {
oximeter_db::Error::TimeseriesNotFound(_) => Ok(vec![]),
_ => Err(err),
}
_ => Error::InternalError { internal_message: e.to_string() },
})
.map_err(map_oximeter_err)?;

if timeseries_list.len() > 1 {
return Err(Error::internal_error(&format!(
"expected 1 timeseries but got {} ({:?} {:?})",
timeseries_list.len(),
timeseries_name,
criteria
)));
}

// If we received no data, exit early.
let timeseries =
if let Some(timeseries) = timeseries_list.into_iter().next() {
timeseries
} else {
return Ok(no_results());
};

Ok(dropshot::ResultsPage::new(
timeseries.measurements,
&query,
|last_measurement: &Measurement, query: &ResourceMetrics| {
ResourceMetrics {
start_time: last_measurement.timestamp(),
end_time: query.end_time,
}
},
)
.unwrap())
}

// Internal helper to build an Oximeter client from its ID and address (common data between
Expand Down Expand Up @@ -259,3 +361,12 @@ impl super::Nexus {
Ok((self.build_oximeter_client(&id, address), id))
}
}

fn map_oximeter_err(error: oximeter_db::Error) -> Error {
match error {
oximeter_db::Error::DatabaseUnavailable(_) => {
Error::ServiceUnavailable { internal_message: error.to_string() }
}
_ => Error::InternalError { internal_message: error.to_string() },
}
}
70 changes: 70 additions & 0 deletions nexus/src/external_api/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use omicron_common::api::external::Saga;
use omicron_common::api::external::VpcFirewallRuleUpdateParams;
use omicron_common::api::external::VpcFirewallRules;
use omicron_common::bail_unless;
use parse_display::Display;
use ref_cast::RefCast;
use schemars::JsonSchema;
use serde::Deserialize;
Expand Down Expand Up @@ -115,6 +116,7 @@ pub fn external_api() -> NexusApiDescription {
api.register(disk_view)?;
api.register(disk_view_by_id)?;
api.register(disk_delete)?;
api.register(disk_metrics_list)?;

api.register(instance_list)?;
api.register(instance_create)?;
Expand Down Expand Up @@ -1515,6 +1517,65 @@ async fn disk_delete(
apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await
}

#[derive(Display, Deserialize, JsonSchema)]
#[display(style = "snake_case")]
#[serde(rename_all = "snake_case")]
pub enum DiskMetricName {
Activated,
Flush,
Read,
ReadBytes,
Write,
WriteBytes,
}

/// Fetch metrics for a disk.
#[endpoint {
method = GET,
path = "/organizations/{organization_name}/projects/{project_name}/disks/{disk_name}/metrics/{metric_name}",
tags = ["disks"],
}]
async fn disk_metrics_list(
rqctx: Arc<RequestContext<Arc<ServerContext>>>,
path_params: Path<MetricsPathParam<DiskPathParam, DiskMetricName>>,
query_params: Query<
PaginationParams<params::ResourceMetrics, params::ResourceMetrics>,
>,
) -> Result<HttpResponseOk<ResultsPage<oximeter_db::Measurement>>, HttpError> {
let apictx = rqctx.context();
let nexus = &apictx.nexus;

let path = path_params.into_inner();
let organization_name = &path.inner.organization_name;
let project_name = &path.inner.project_name;
let disk_name = &path.inner.disk_name;
let metric_name = path.metric_name;

let query = query_params.into_inner();
let limit = rqctx.page_limit(&query)?;

let handler = async {
let opctx = OpContext::for_external_api(&rqctx).await?;

// This ensures the user is authorized on Action::Read for this disk
let disk = nexus
.disk_fetch(&opctx, organization_name, project_name, disk_name)
.await?;
let upstairs_uuid = disk.id();
let result = nexus
.select_timeseries(
&format!("crucible_upstairs:{}", metric_name),
&[&format!("upstairs_uuid=={}", upstairs_uuid)],
query,
limit,
)
.await?;

Ok(HttpResponseOk(result))
};
apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await
}

// Instances

/// List instances in a project.
Expand Down Expand Up @@ -4093,6 +4154,15 @@ async fn session_sshkey_delete(
apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await
}

/// Path parameters for metrics requests where `/metrics/{metric_name}` is
/// appended to an existing path parameter type
#[derive(Deserialize, JsonSchema)]
struct MetricsPathParam<T, M> {
#[serde(flatten)]
inner: T,
metric_name: M,
}

#[cfg(test)]
mod test {
use super::external_api;
Expand Down
23 changes: 20 additions & 3 deletions nexus/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ pub async fn test_setup_with_config(
)
.await
.unwrap();
register_test_producer(&producer).unwrap();

ControlPlaneTestContext {
server,
Expand Down Expand Up @@ -253,6 +254,10 @@ impl oximeter::Producer for IntegrationProducer {
}
}

/// Creates and starts a producer server.
///
/// Actual producers can be registered with the [`register_producer`]
/// helper function.
pub async fn start_producer_server(
nexus_address: SocketAddr,
id: Uuid,
Expand Down Expand Up @@ -281,9 +286,22 @@ pub async fn start_producer_server(
};
let server =
ProducerServer::start(&config).await.map_err(|e| e.to_string())?;
Ok(server)
}

/// Registers an arbitrary producer with the test server.
pub fn register_producer(
server: &ProducerServer,
producer: impl oximeter::Producer,
) -> Result<(), String> {
server.registry().register_producer(producer).map_err(|e| e.to_string())?;
Ok(())
}

/// Registers a sample-generating test-specific producer.
pub fn register_test_producer(server: &ProducerServer) -> Result<(), String> {
// Create and register an actual metric producer.
let producer = IntegrationProducer {
let test_producer = IntegrationProducer {
target: IntegrationTarget {
name: "integration-test-target".to_string(),
},
Expand All @@ -292,8 +310,7 @@ pub async fn start_producer_server(
datum: 0,
},
};
server.registry().register_producer(producer).map_err(|e| e.to_string())?;
Ok(server)
register_producer(server, test_producer)
}

/// Returns whether the two identity metadata objects are identical.
Expand Down
13 changes: 10 additions & 3 deletions nexus/test-utils/src/resource_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,28 +188,35 @@ pub async fn create_disk(
.await
}

/// Creates an instance with a default NIC and no disks.
///
/// Wrapper around [`create_instance_with`].
pub async fn create_instance(
client: &ClientTestContext,
organization_name: &str,
project_name: &str,
instance_name: &str,
) -> Instance {
create_instance_with_nics(
create_instance_with(
client,
organization_name,
project_name,
instance_name,
&params::InstanceNetworkInterfaceAttachment::Default,
// Disks=
vec![],
)
.await
}

pub async fn create_instance_with_nics(
/// Creates an instance with attached resou8rces.
pub async fn create_instance_with(
client: &ClientTestContext,
organization_name: &str,
project_name: &str,
instance_name: &str,
nics: &params::InstanceNetworkInterfaceAttachment,
disks: Vec<params::InstanceDiskAttachment>,
) -> Instance {
let url = format!(
"/organizations/{}/projects/{}/instances",
Expand All @@ -231,7 +238,7 @@ pub async fn create_instance_with_nics(
.to_vec(),
network_interfaces: nics.clone(),
external_ips: vec![],
disks: vec![],
disks,
},
)
.await
Expand Down
Loading

0 comments on commit e9554ad

Please sign in to comment.