Skip to content

Commit

Permalink
lighthouse, manager: dashboard kill and heartbeat old ui (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
d4l3k authored Nov 9, 2024
1 parent 1b5597b commit faf90ec
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 15 deletions.
6 changes: 6 additions & 0 deletions proto/torchft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,14 @@ message ShouldCommitResponse {
bool should_commit = 1;
}

message KillRequest {
string msg = 1;
}
message KillResponse {}

service ManagerService {
rpc Quorum (ManagerQuorumRequest) returns (ManagerQuorumResponse);
rpc CheckpointAddress(CheckpointAddressRequest) returns (CheckpointAddressResponse);
rpc ShouldCommit(ShouldCommitRequest) returns (ShouldCommitResponse);
rpc Kill(KillRequest) returns (KillResponse);
}
83 changes: 76 additions & 7 deletions src/lighthouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use anyhow::Result;
use anyhow::{anyhow, Result};
use askama::Template;
use axum::{response::Html, routing::get, Router};
use axum::{
extract::Path,
http::StatusCode,
response::{Html, IntoResponse},
routing::{get, post},
Router,
};
use gethostname::gethostname;
use log::{error, info};
use structopt::StructOpt;
Expand All @@ -24,9 +30,10 @@ use tonic::service::Routes;
use tonic::transport::Server;
use tonic::{Request, Response, Status};

use crate::manager::manager_client_new;
use crate::torchftpb::{
lighthouse_service_server::{LighthouseService, LighthouseServiceServer},
LighthouseHeartbeatRequest, LighthouseHeartbeatResponse, LighthouseQuorumRequest,
KillRequest, LighthouseHeartbeatRequest, LighthouseHeartbeatResponse, LighthouseQuorumRequest,
LighthouseQuorumResponse, Quorum, QuorumMember,
};

Expand Down Expand Up @@ -203,8 +210,6 @@ impl Lighthouse {
bind.port()
);

let self_clone = self.clone();

// Setup HTTP endpoints
let app = Router::new()
.route(
Expand All @@ -213,7 +218,17 @@ impl Lighthouse {
)
.route(
"/status",
get(move || async { self_clone.get_status().await }),
get({
let self_clone = self.clone();
move || async { self_clone.get_status().await }
}),
)
.route(
"/replica/:replica_id/kill",
post({
let self_clone = self.clone();
move |path| async { self_clone.kill(path).await }
}),
);

// register the GRPC service
Expand Down Expand Up @@ -249,10 +264,38 @@ impl Lighthouse {
quorum_id: state.quorum_id,
prev_quorum: state.prev_quorum.clone(),
heartbeats: state.heartbeats.clone(),
old_age_threshold: Instant::now()
.checked_sub(Duration::from_secs(1))
.unwrap_or(Instant::now()),
}
};
Html(template.render().unwrap())
}

async fn kill(self: Arc<Self>, Path(replica_id): Path<String>) -> Result<(), AppError> {
let addr = 'addr: {
let state = self.state.lock().await;
if state.prev_quorum.is_none() {
return Err(AppError(anyhow!("failed to find replica")));
}

for member in state.prev_quorum.clone().unwrap().participants {
if member.replica_id == replica_id {
break 'addr member.address;
}
}
return Err(AppError(anyhow!("failed to find replica")));
};

let mut client = manager_client_new(addr, Duration::from_secs(10)).await?;

let request = tonic::Request::new(KillRequest {
msg: "killed from dashboard".to_string(),
});
let _resp = client.kill(request).await?;

Ok(())
}
}

#[tonic::async_trait]
Expand Down Expand Up @@ -318,11 +361,37 @@ struct IndexTemplate {}
#[derive(Template)]
#[template(path = "status.html")]
struct StatusTemplate {
old_age_threshold: Instant,
prev_quorum: Option<Quorum>,
quorum_id: i64,
heartbeats: HashMap<String, Instant>,
}

// Make our own error that wraps `anyhow::Error`.
struct AppError(anyhow::Error);

// Tell axum how to convert `AppError` into a response.
impl IntoResponse for AppError {
fn into_response(self) -> axum::response::Response {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Something went wrong: {}", self.0),
)
.into_response()
}
}

// This enables using `?` on functions that return `Result<_, anyhow::Error>` to turn them into
// `Result<_, AppError>`. That way you don't need to do that manually.
impl<E> From<E> for AppError
where
E: Into<anyhow::Error>,
{
fn from(err: E) -> Self {
Self(err.into())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -442,7 +511,7 @@ mod tests {
replica_id: "foo".to_string(),
});

let response = client.heartbeat(request).await.unwrap();
let _response = client.heartbeat(request).await.unwrap();
}

{
Expand Down
17 changes: 11 additions & 6 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use crate::torchftpb::lighthouse_service_client::LighthouseServiceClient;
use crate::torchftpb::manager_service_client::ManagerServiceClient;
use crate::torchftpb::{
manager_service_server::{ManagerService, ManagerServiceServer},
CheckpointAddressRequest, CheckpointAddressResponse, LighthouseHeartbeatRequest,
LighthouseQuorumRequest, ManagerQuorumRequest, ManagerQuorumResponse, Quorum, QuorumMember,
ShouldCommitRequest, ShouldCommitResponse,
CheckpointAddressRequest, CheckpointAddressResponse, KillRequest, KillResponse,
LighthouseHeartbeatRequest, LighthouseQuorumRequest, ManagerQuorumRequest,
ManagerQuorumResponse, Quorum, QuorumMember, ShouldCommitRequest, ShouldCommitResponse,
};

#[cfg(not(test))]
Expand Down Expand Up @@ -131,7 +131,7 @@ impl Manager {
replica_id: self.replica_id.clone(),
});

let response = client.heartbeat(request).await;
let _response = client.heartbeat(request).await;

sleep(Duration::from_millis(100)).await;
}
Expand Down Expand Up @@ -333,12 +333,17 @@ impl ManagerService for Arc<Manager> {
};
Ok(Response::new(reply))
}

async fn kill(&self, request: Request<KillRequest>) -> Result<Response<KillResponse>, Status> {
let req = request.into_inner();

warn!("got kill request: {}", req.msg);
std::process::exit(1);
}
}

#[cfg(test)]
mod tests {
use tokio::time::sleep;

use super::*;

async fn should_commit(rank: i64, should_commit: bool) -> Result<ShouldCommitResponse> {
Expand Down
2 changes: 1 addition & 1 deletion templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
padding: 10px;
border: 1px solid #333;
}
.hearbeat.old {
.heartbeat.old {
color: red;
}
</style>
Expand Down
7 changes: 6 additions & 1 deletion templates/status.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ <h3>Previous Quorum</h3>
Step: {{ member.step }} <br/>
Manager: {{ member.address }} <br/>
TCPStore: {{ member.store_address }}

<button hx-post="/replica/{{member.replica_id}}/kill"
hx-trigger="click">
Kill
</button>
</div>

{% endfor %}
Expand All @@ -28,7 +33,7 @@ <h3>Heartbeats</h3>
{% for replica_id in heartbeats.keys() %}

{% let age = heartbeats[replica_id].elapsed().as_secs_f64() %}
<li class="heartbeat">
<li class="heartbeat {% if heartbeats[replica_id].lt(old_age_threshold) %}old{%endif%}">
{{ replica_id }}: seen {{ age }}s ago
</li>

Expand Down

0 comments on commit faf90ec

Please sign in to comment.