Skip to content

Commit

Permalink
feat: telemetery
Browse files Browse the repository at this point in the history
  • Loading branch information
chesedo committed Aug 23, 2022
1 parent 98f1182 commit 7fe6ea1
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 13 deletions.
92 changes: 92 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions deployer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ log = "0.4.17"
tokio = { version = "1.19.2", features = ["fs"] }
sqlx = { version = "0.6.0", features = ["runtime-tokio-native-tls", "sqlite", "chrono", "json", "uuid"] }
tower = { version = "0.4.12", features = ["make"] }
tower-http = { version = "0.3.4", features = ["trace"] }
axum = { version = "0.5.7", features = ["ws"] }
bytes = "1.1.0"
serde = { version = "1.0.137", features = ["derive"] }
Expand All @@ -24,6 +25,9 @@ tar = "0.4.38"
rand = "0.8.5"
tracing = "0.1.35"
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
tracing-opentelemetry = "0.17.4"
opentelemetry = { version = "0.17.0", features = ["rt-tokio"] }
opentelemetry-datadog = { version = "0.5.0", features = ["reqwest-client"] }
cargo = "0.62.0"
uuid = { version = "1.1.2", features = ["v4"] }
tonic = "0.7.2"
Expand Down
27 changes: 19 additions & 8 deletions deployer/src/deployment/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,11 @@ pub struct Queued {
}

impl Queued {
#[instrument(skip(self, log_recorder), fields(id = %self.id, state = %State::Building))]
async fn handle(mut self, log_recorder: impl LogRecorder) -> Result<Built> {
#[instrument(name = "queued_handle", skip(self, log_recorder), fields(id = %self.id, state = %State::Building))]
async fn handle(self, log_recorder: impl LogRecorder) -> Result<Built> {
info!("Fetching POSTed data");

let mut vec = Vec::new();
while let Some(buf) = self.data_stream.next().await {
let buf = buf?;
debug!("Received {} bytes", buf.len());
vec.put(buf);
}
let vec = extract_stream(self.data_stream).await?;

info!("Extracting received data");

Expand Down Expand Up @@ -168,7 +163,22 @@ impl fmt::Debug for Queued {
}
}

#[instrument(skip(data_stream))]
async fn extract_stream(
mut data_stream: Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + Sync>>,
) -> Result<Vec<u8>> {
let mut vec = Vec::new();
while let Some(buf) = data_stream.next().await {
let buf = buf?;
debug!("Received {} bytes", buf.len());
vec.put(buf);
}

Ok(vec)
}

/// Equivalent to the command: `tar -xzf --strip-components 1`
#[instrument(skip(data, dest))]
fn extract_tar_gz_data(data: impl Read, dest: impl AsRef<Path>) -> Result<()> {
let tar = GzDecoder::new(data);
let mut archive = Archive::new(tar);
Expand All @@ -183,6 +193,7 @@ fn extract_tar_gz_data(data: impl Read, dest: impl AsRef<Path>) -> Result<()> {
Ok(())
}

#[instrument(skip(project_path))]
fn run_pre_deploy_tests(project_path: impl AsRef<Path>) -> Result<()> {
let config = CargoConfig::default().map_err(|e| Error::Build(e.into()))?;
let manifest_path = project_path.as_ref().join("Cargo.toml");
Expand Down
2 changes: 1 addition & 1 deletion deployer/src/deployment/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub struct Built {
}

impl Built {
#[instrument(skip(self, factory, logger, cleanup), fields(id = %self.id, state = %State::Running))]
#[instrument(name = "built_handle", skip(self, factory, logger, cleanup), fields(id = %self.id, state = %State::Running))]
async fn handle(
self,
addr: SocketAddr,
Expand Down
23 changes: 19 additions & 4 deletions deployer/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use axum::body::Body;
use axum::extract::ws::WebSocket;
use axum::extract::{ws, Extension, Path, Query};
use axum::body::{Body, BoxBody};
use axum::extract::ws::{self, WebSocket};
use axum::extract::{Extension, Path, Query};
use axum::http::{Request, Response};
use axum::routing::{get, Router};
use axum::{extract::BodyStream, Json};
use chrono::{TimeZone, Utc};
use futures::TryStreamExt;
use shuttle_common::BuildLog;
use tracing::error;
use tower_http::trace::TraceLayer;
use tracing::{debug, debug_span, error, field, Span};
use uuid::Uuid;

use crate::deployment::{DeploymentManager, Queued, State};
use crate::error::{Error, Result};
use crate::persistence::{Deployment, Persistence};

use std::collections::HashMap;
use std::time::Duration;

pub fn make_router(
persistence: Persistence,
Expand All @@ -36,6 +39,18 @@ pub fn make_router(
.route("/deployments/:id/build-logs", get(get_build_logs))
.layer(Extension(persistence))
.layer(Extension(deployment_manager))
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &Request<Body>| {
debug_span!("request", http.uri = %request.uri(), http.method = %request.method(), http.status_code = field::Empty)
})
.on_response(
|response: &Response<BoxBody>, latency: Duration, span: &Span| {
span.record("http.status_code", &response.status().as_u16());
debug!(latency = format_args!("{} ns", latency.as_nanos()), "finished processing request");
},
),
)
}

async fn list_services(
Expand Down
6 changes: 6 additions & 0 deletions deployer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@ async fn main() {
.unwrap();

let (persistence, _) = Persistence::new().await;
let tracer = opentelemetry_datadog::new_pipeline()
.with_service_name("deployer")
.install_batch(opentelemetry::runtime::Tokio)
.unwrap();
let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);
tracing_subscriber::registry()
.with(DeployLayer::new(persistence.clone()))
.with(filter_layer)
.with(fmt_layer)
.with(opentelemetry)
.init();

let provisioner_uri = Endpoint::try_from(format!(
Expand Down
1 change: 1 addition & 0 deletions shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ in
grpcurl
gh
docker-compose
datadog-agent
];

PROTOC = "${protobuf}/bin/protoc";
Expand Down

0 comments on commit 7fe6ea1

Please sign in to comment.