Skip to content

Commit

Permalink
feat: add --head and --tail to logs command (#1629)
Browse files Browse the repository at this point in the history
Co-authored-by: biplab5464 <[email protected]>
Co-authored-by: jonaro00 <[email protected]>
Co-authored-by: oddgrd <[email protected]>
  • Loading branch information
4 people authored Apr 22, 2024
1 parent 13b334f commit b5116c7
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 21 deletions.
9 changes: 9 additions & 0 deletions cargo-shuttle/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,15 @@ pub struct LogsArgs {
/// Don't display timestamps and log origin tags
#[arg(long)]
pub raw: bool,
/// View the first N log lines
#[arg(long, group = "output_mode")]
pub head: Option<u32>,
/// View the last N log lines
#[arg(long, group = "output_mode")]
pub tail: Option<u32>,
/// View all log lines
#[arg(long, group = "output_mode")]
pub all: bool,
}

/// Helper function to parse and return the absolute path
Expand Down
29 changes: 26 additions & 3 deletions cargo-shuttle/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use reqwest::RequestBuilder;
use reqwest::Response;
use serde::{Deserialize, Serialize};
use shuttle_common::constants::headers::X_CARGO_SHUTTLE_VERSION;
use shuttle_common::log::LogsRange;
use shuttle_common::models::deployment::DeploymentRequest;
use shuttle_common::models::organization;
use shuttle_common::models::{deployment, project, service, ToJson};
Expand Down Expand Up @@ -193,8 +194,14 @@ impl Client {
self.get(path).await
}

pub async fn get_logs(&self, project: &str, deployment_id: &Uuid) -> Result<Vec<LogItem>> {
let path = format!("/projects/{project}/deployments/{deployment_id}/logs");
pub async fn get_logs(
&self,
project: &str,
deployment_id: &Uuid,
range: LogsRange,
) -> Result<Vec<LogItem>> {
let mut path = format!("/projects/{project}/deployments/{deployment_id}/logs");
Self::add_range_query(range, &mut path);

self.get(path)
.await
Expand All @@ -205,12 +212,28 @@ impl Client {
&self,
project: &str,
deployment_id: &Uuid,
range: LogsRange,
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
let path = format!("/projects/{project}/ws/deployments/{deployment_id}/logs");
let mut path = format!("/projects/{project}/ws/deployments/{deployment_id}/logs");
Self::add_range_query(range, &mut path);

self.ws_get(path).await
}

fn add_range_query(range: LogsRange, path: &mut String) {
match range {
LogsRange::Head(n) => {
path.push_str("?head=");
path.push_str(&n.to_string())
}
LogsRange::Tail(n) => {
path.push_str("?tail=");
path.push_str(&n.to_string())
}
_ => {}
};
}

pub async fn get_deployments(
&self,
project: &str,
Expand Down
15 changes: 11 additions & 4 deletions cargo-shuttle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use shuttle_common::{
SHUTTLE_INSTALL_DOCS_URL, SHUTTLE_LOGIN_URL, STORAGE_DIRNAME, TEMPLATES_SCHEMA_VERSION,
},
deployment::{DEPLOYER_END_MESSAGES_BAD, DEPLOYER_END_MESSAGES_GOOD},
log::LogsRange,
models::{
deployment::{
get_deployments_table, DeploymentRequest, CREATE_SERVICE_BODY_LIMIT,
Expand Down Expand Up @@ -839,6 +840,12 @@ impl Shuttle {
}

async fn logs(&self, args: LogsArgs) -> Result<CommandOutcome> {
let range = match (args.head, args.tail, args.all) {
(Some(num), _, _) => LogsRange::Head(num),
(_, Some(num), _) => LogsRange::Tail(num),
(_, _, true) => LogsRange::All,
_ => LogsRange::Tail(1000),
};
let client = self.client.as_ref().unwrap();
let id = if let Some(id) = args.id {
id
Expand Down Expand Up @@ -874,7 +881,7 @@ impl Shuttle {

if args.follow {
let mut stream = client
.get_logs_ws(self.ctx.project_name(), &id)
.get_logs_ws(self.ctx.project_name(), &id, range)
.await
.map_err(|err| {
suggestions::logs::get_logs_failure(err, "Connecting to the logs stream failed")
Expand Down Expand Up @@ -906,7 +913,7 @@ impl Shuttle {
}
} else {
let logs = client
.get_logs(self.ctx.project_name(), &id)
.get_logs(self.ctx.project_name(), &id, range)
.await
.map_err(|err| {
suggestions::logs::get_logs_failure(err, "Fetching the deployment failed")
Expand Down Expand Up @@ -1687,7 +1694,7 @@ impl Shuttle {
.map_err(suggestions::deploy::deploy_request_failure)?;

let mut stream = client
.get_logs_ws(self.ctx.project_name(), &deployment.id)
.get_logs_ws(self.ctx.project_name(), &deployment.id, LogsRange::All)
.await
.map_err(|err| {
suggestions::deploy::deployment_setup_failure(
Expand Down Expand Up @@ -1807,7 +1814,7 @@ impl Shuttle {
// the terminal isn't completely spammed
sleep(Duration::from_millis(100)).await;
stream = client
.get_logs_ws(self.ctx.project_name(), &deployment.id)
.get_logs_ws(self.ctx.project_name(), &deployment.id, LogsRange::All)
.await
.map_err(|err| {
suggestions::deploy::deployment_setup_failure(
Expand Down
8 changes: 8 additions & 0 deletions common/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ pub enum Backend {
Runtime(String),
}

/// Which subset of deployment log lines to process
#[derive(Deserialize)]
pub enum LogsRange {
Head(u32),
Tail(u32),
All,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct LogItem {
/// Deployment id
Expand Down
18 changes: 16 additions & 2 deletions deployer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ pub struct PaginationDetails {
pub limit: Option<u32>,
}

#[derive(Deserialize)]
struct LogsQuery {
head: Option<u32>,
tail: Option<u32>,
}
#[derive(Clone)]
pub struct RouterBuilder {
router: Router,
Expand Down Expand Up @@ -424,9 +429,12 @@ pub async fn get_logs(
Extension(deployment_manager): Extension<DeploymentManager>,
Extension(claim): Extension<Claim>,
CustomErrorPath((project_name, deployment_id)): CustomErrorPath<(String, Uuid)>,
Query(LogsQuery { head, tail }): Query<LogsQuery>,
) -> Result<Json<Vec<LogItem>>> {
let mut logs_request: tonic::Request<LogsRequest> = tonic::Request::new(LogsRequest {
deployment_id: deployment_id.to_string(),
head,
tail,
});

logs_request.extensions_mut().insert(claim);
Expand Down Expand Up @@ -464,20 +472,26 @@ pub async fn get_logs_subscribe(
Extension(deployment_manager): Extension<DeploymentManager>,
Extension(claim): Extension<Claim>,
CustomErrorPath((project_name, deployment_id)): CustomErrorPath<(String, Uuid)>,
Query(LogsQuery { head, tail }): Query<LogsQuery>,
ws_upgrade: ws::WebSocketUpgrade,
) -> axum::response::Response {
ws_upgrade
.on_upgrade(move |s| logs_websocket_handler(s, deployment_manager, deployment_id, claim))
ws_upgrade.on_upgrade(move |s| {
logs_websocket_handler(s, deployment_manager, deployment_id, head, tail, claim)
})
}

async fn logs_websocket_handler(
mut s: WebSocket,
deployment_manager: DeploymentManager,
deployment_id: Uuid,
head: Option<u32>,
tail: Option<u32>,
claim: Claim,
) {
let mut logs_request: tonic::Request<LogsRequest> = tonic::Request::new(LogsRequest {
deployment_id: deployment_id.to_string(),
head,
tail,
});

logs_request.extensions_mut().insert(claim);
Expand Down
48 changes: 41 additions & 7 deletions logger/src/dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::{Duration, SystemTime};
use async_trait::async_trait;
use chrono::NaiveDateTime;
use prost_types::Timestamp;
use shuttle_common::log::LogsRange;
use shuttle_proto::logger::{LogItem, LogLine};
use sqlx::{
migrate::Migrator,
Expand Down Expand Up @@ -45,7 +46,12 @@ impl fmt::Display for DalError {
#[async_trait]
pub trait Dal {
/// Get logs for a deployment
async fn get_logs(&self, deployment_id: String) -> Result<Vec<Log>, DalError>;
async fn get_logs(
&self,
deployment_id: String,
head: Option<u32>,
tail: Option<u32>,
) -> Result<Vec<Log>, DalError>;
}

#[derive(Clone)]
Expand Down Expand Up @@ -150,13 +156,41 @@ impl Postgres {

#[async_trait]
impl Dal for Postgres {
async fn get_logs(&self, deployment_id: String) -> Result<Vec<Log>, DalError> {
let result =
sqlx::query_as("SELECT * FROM logs WHERE deployment_id = $1 ORDER BY tx_timestamp")
.bind(deployment_id)
.fetch_all(&self.pool)
.await?;
async fn get_logs(
&self,
deployment_id: String,
head: Option<u32>,
tail: Option<u32>,
) -> Result<Vec<Log>, DalError> {
let mode = match (head, tail) {
(Some(len), None) => LogsRange::Head(len),
(None, Some(len)) => LogsRange::Tail(len),
(None, None) => LogsRange::All,
_ => LogsRange::Tail(1000),
};

let result = match mode {
LogsRange::Head(len) => {
sqlx::query_as("SELECT * FROM logs WHERE deployment_id = $1 ORDER BY tx_timestamp limit $2")
.bind(deployment_id)
.bind(len as i64)
.fetch_all(&self.pool)
.await?
}
LogsRange::Tail(len) => {
sqlx::query_as("SELECT * FROM (SELECT * FROM logs WHERE deployment_id = $1 ORDER BY tx_timestamp DESC limit $2) AS TAIL_TABLE ORDER BY tx_timestamp")
.bind(deployment_id)
.bind(len as i64)
.fetch_all(&self.pool)
.await?
}
LogsRange::All => {
sqlx::query_as("SELECT * FROM logs WHERE deployment_id = $1 ORDER BY tx_timestamp")
.bind(deployment_id)
.fetch_all(&self.pool)
.await?
}
};
Ok(result)
}
}
Expand Down
21 changes: 16 additions & 5 deletions logger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ where
Self { dal, logs_tx }
}

async fn get_logs(&self, deployment_id: String) -> Result<Vec<LogLine>, Error> {
let logs = self.dal.get_logs(deployment_id).await?;
async fn get_logs(
&self,
deployment_id: String,
head: Option<u32>,
tail: Option<u32>,
) -> Result<Vec<LogLine>, Error> {
let logs = self.dal.get_logs(deployment_id, head, tail).await?;

Ok(logs.into_iter().map(Into::into).collect())
}
Expand Down Expand Up @@ -94,7 +99,9 @@ where
request.verify(Scope::Logs)?;

let request = request.into_inner();
let log_items = self.get_logs(request.deployment_id).await?;
let log_items = self
.get_logs(request.deployment_id, request.head, request.tail)
.await?;
let result = LogsResponse { log_items };

Ok(Response::new(result))
Expand All @@ -111,11 +118,15 @@ where

// Subscribe as soon as possible
let mut logs_rx = self.logs_tx.subscribe();
let LogsRequest { deployment_id } = request.into_inner();
let LogsRequest {
deployment_id,
head,
tail,
} = request.into_inner();
let (tx, rx) = mpsc::channel(1);

// Get logs before stream was started
let logs = self.get_logs(deployment_id.clone()).await?;
let logs = self.get_logs(deployment_id.clone(), head, tail).await?;

tokio::spawn(async move {
let mut last = Default::default();
Expand Down
4 changes: 4 additions & 0 deletions logger/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ mod needs_docker {
let logs = client
.get_logs(Request::new(LogsRequest {
deployment_id: deployment_id.into(),
head: None,
tail: None,
}))
.await
.unwrap()
Expand Down Expand Up @@ -156,6 +158,8 @@ mod needs_docker {
let mut response = client
.get_logs_stream(Request::new(LogsRequest {
deployment_id: deployment_id.into(),
head: None,
tail: None,
}))
.await
.unwrap()
Expand Down
2 changes: 2 additions & 0 deletions proto/logger.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ message StoreLogsResponse {

message LogsRequest {
string deployment_id = 1;
optional uint32 head = 2;
optional uint32 tail = 3;
}

message LogsResponse {
Expand Down
4 changes: 4 additions & 0 deletions proto/src/generated/logger.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b5116c7

Please sign in to comment.