Skip to content

Commit

Permalink
Merge pull request #9 from Northeastern-Electric-Racing/jack-test-1
Browse files Browse the repository at this point in the history
Jack test 1
  • Loading branch information
jr1221 authored Dec 23, 2024
2 parents 9714866 + c6e4586 commit 2178034
Show file tree
Hide file tree
Showing 13 changed files with 1,162 additions and 81 deletions.
859 changes: 856 additions & 3 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,16 @@ name = "odysseus-daemon"
version = "0.1.0"
edition = "2021"

[dependencies]
[workspace]
members = ["uploader"]

[workspace.dependencies]
reqwest = { version = "0.12.9", features = ["blocking", "multipart"] }
clap = { version = "4.5.23", features = ["derive", "env"] }


[dependencies]
clap.workspace = true
protobuf = "3.7.1"
rumqttc = "0.24.0"
serde = { version = "1.0.216", features = ["derive"] }
Expand Down
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ Core principles:

Modules
- `visual`: Camera process manager and writer. Status: Beta
- `lockdown`: Feature disabler and modifier upon HV enablement. Status: Alpha
- `audible`: Call feature trigger and monitor. Status: Alpha
- `lockdown`: Feature disabler and modifier upon HV enablement. Status: Beta
- `audible`: Call feature trigger and monitor. Status: Beta
- `numerical`: Telemetry scraper and sender (tpu-telemetry python replacement). Status: Incomplete
- `logger`: MQTT receiver and disk logger. Status: Beta
- `logger`: MQTT receiver and disk logger. Status: Alpha

Upload modules:
- `logger`: Upload from the logger module to scylla. Status: Beta
- `visual`: Camera video uploader to cloud platform. Status: Incomplete
- `serial`: (from `lockdown` module) Serial output uploader to cloud platform. Status Incomplete
-


**This program will only run on Odysseus**
11 changes: 8 additions & 3 deletions src/audible.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::error::Error;

use tokio::sync::watch::Receiver;
use tokio::{process::Command, sync::watch::Receiver};
use tokio_util::sync::CancellationToken;

/// runs the mute/unmute functionality
Expand All @@ -11,11 +11,16 @@ pub async fn audible_manager(
loop {
tokio::select! {
_ = cancel_token.cancelled() => {

Command::new("linphonecsh").args(["generic", "unmute"]).spawn()?.wait().await?;
},
new = mute_stat_recv.changed() => {
new?;

// to mute or not
if *mute_stat_recv.borrow_and_update() {
Command::new("linphonecsh").args(["generic", "mute"]).spawn()?.wait().await?;
} else {
Command::new("linphonecsh").args(["generic", "unmute"]).spawn()?.wait().await?;
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ pub const HV_EN_TOPIC: &str = "MPU/State/TSMS";

/// the topic to listen for mute enable, 1 is on 0 is off
pub const MUTE_EN_TOPIC: &str = "WHEEL/Buttons/Mute";

/// The save location for all files
pub static SAVE_LOCATION: std::sync::OnceLock<String> = std::sync::OnceLock::new();
103 changes: 84 additions & 19 deletions src/lockdown.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,52 @@
use std::error::Error;
use std::{error::Error, process::Stdio, time::Duration};

use tokio::{io, process::Command, sync::watch::Receiver};
use tokio::{
io,
process::{Child, Command},
sync::watch::Receiver,
};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};

use crate::HVTransition;
use crate::{HVTransition, SAVE_LOCATION};

/// Run various HV on/off lockdowns
/// Takes in a receiver of HV state
pub async fn lockdown_runner(
cancel_token: CancellationToken,
mut hv_stat_recv: Receiver<HVTransition>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut cmds: Option<(Child, Child)> = None;
if let Err(err) = hv_transition_disabled(&mut cmds).await {
warn!("Could not unlock!!! {}", err);
}

loop {
tokio::select! {
_ = cancel_token.cancelled() => {
if let Err(err) = hv_transition_disabled().await {
if let Err(err) = hv_transition_disabled(&mut cmds).await {
warn!("Could not unlock!!! {}", err);
}
break Ok(());
},
new = hv_stat_recv.changed() => {
new?;
let curr_data = *hv_stat_recv.borrow_and_update();
let curr_state = match curr_data {
HVTransition::TransitionOn(_) => true,
HVTransition::TransitionOff => false,
};
if curr_state {
info!("Locking down!");
if let Err(err) = hv_transition_enabled().await {
warn!("Could not lock down!!! {}", err);
}
} else {
info!("Unlocking!");
if let Err(err) = hv_transition_disabled().await {
match curr_data {
HVTransition::TransitionOn(hvon_data) => {
info!("Locking down!");
let Ok(children) = hv_transition_enabled(hvon_data.time_ms).await else {
warn!("Could not lock down!!!");
continue;
};
cmds = Some(children);
},
HVTransition::TransitionOff => {
info!("Unlocking!");
if let Err(err) = hv_transition_disabled(&mut cmds).await {
warn!("Could not unlock!!! {}", err);
}
},
}

}
Expand All @@ -45,7 +55,9 @@ pub async fn lockdown_runner(
}

/// Transition to HV on
pub async fn hv_transition_enabled() -> io::Result<()> {
pub async fn hv_transition_enabled(time_ms: u64) -> io::Result<(Child, Child)> {
// unbind from the usbipd server
// this automatically brings back
let mut cmd_cerb_dis = Command::new("usbip")
.args(["unbind", "--busid", "1-1.3"])
.spawn()?;
Expand All @@ -56,14 +68,62 @@ pub async fn hv_transition_enabled() -> io::Result<()> {
cmd_cerb_dis.wait().await?;
cmd_shep_dis.wait().await?;

tokio::time::sleep(Duration::from_secs(2)).await;

let mut cmd_cerb_conf = Command::new("stty")
.args(["-F", "/dev/ttyCerberus", "115200"])
.spawn()?;

let mut cmd_shep_conf = Command::new("stty")
.args(["-F", "/dev/ttyShepherd", "115200"])
.spawn()?;

cmd_cerb_conf.wait().await?;
cmd_shep_conf.wait().await?;

// TODO actually write the tty read from cat into a file, and

// if !cmd_cerb_dis.wait().await.unwrap().success() && !cmd_shep_dis.wait().await.unwrap().success() {
// info!("Failed to run USBIP command(s) to unbind");
// }
Ok(())
let cerb_save_loc = format!(
"{}/event-{}/cerberus-dump.cap",
SAVE_LOCATION.get().unwrap(),
time_ms
);
let shep_save_loc = format!(
"{}/event-{}/shepherd-dump.cap",
SAVE_LOCATION.get().unwrap(),
time_ms
);
Ok((
Command::new("minicom")
.args([
"-D",
"/dev/ttyCerberus",
"-O",
"timestamp=extended",
"-C",
&cerb_save_loc,
])
.stdout(Stdio::null())
.spawn()?,
Command::new("minicom")
.args([
"-D",
"/dev/ttyCerberus",
"-O",
"timestamp=extended",
"-C",
&shep_save_loc,
])
.stdout(Stdio::null())
.spawn()?,
))
}

/// Transition to HV off
pub async fn hv_transition_disabled() -> io::Result<()> {
pub async fn hv_transition_disabled(child_writers: &mut Option<(Child, Child)>) -> io::Result<()> {
let mut cmd_cerb_rec = Command::new("usbip")
.args(["bind", "--busid", "1-1.3"])
.spawn()?;
Expand All @@ -74,6 +134,11 @@ pub async fn hv_transition_disabled() -> io::Result<()> {
cmd_cerb_rec.wait().await?;
cmd_shep_rec.wait().await?;

if let Some(child_writers) = child_writers {
child_writers.0.kill().await?;
child_writers.1.kill().await?;
}

// if !cmd_cerb_rec.wait().await.unwrap().success() && !cmd_shep_rec.wait().await.unwrap().success() {
// println!("Failed to run USBIP command(s) to unbind");
// }
Expand Down
49 changes: 43 additions & 6 deletions src/logger.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,70 @@
use std::error::Error;

use protobuf::Message;
use tokio::sync::mpsc::Receiver;
use tokio::{
fs::File,
io::{AsyncWriteExt, BufWriter},
};
use tokio_util::sync::CancellationToken;
use tracing::warn;

use crate::playback_data;
use crate::{playback_data, HVTransition, SAVE_LOCATION};

/// runs the mute/unmute functionality
/// Takes in a receiver of all MQTT messages
pub async fn logger_manager(
cancel_token: CancellationToken,
mut mqtt_recv_rx: Receiver<playback_data::PlaybackData>,
mut mqtt_recv_rx: tokio::sync::mpsc::Receiver<playback_data::PlaybackData>,
mut hv_stat_recv: tokio::sync::watch::Receiver<HVTransition>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut writer: Option<BufWriter<File>> = None;

loop {
tokio::select! {
_ = cancel_token.cancelled() => {
if let Some(writer) = writer.as_mut() {
return Ok(writer.flush().await?)
}
return Ok(())
},
new = hv_stat_recv.changed() => {
new?;
let val = *hv_stat_recv.borrow_and_update();
match val {
HVTransition::TransitionOn(hvon_data) => {
let filename = format!("{}/event-{}/data_dump.log", SAVE_LOCATION.get().unwrap(), hvon_data.time_ms);
writer = Some(BufWriter::new(File::create_new(filename).await.expect("Could not create log file!")));
},
HVTransition::TransitionOff => {
if let Some(writ) = writer.as_mut() {
writ.flush().await?;
writer = None;

} else {
warn!("Logger - Transition off was unexpected");
}
},
}

},
msg = mqtt_recv_rx.recv() => {
let parsed_msg = match msg {
if writer.is_none() {
continue;
}

match msg {
Some(msg) => {
msg
if let Some(writ) = writer.as_mut() {
if let Err(err) = writ.write(&msg.write_length_delimited_to_bytes().unwrap()).await {
warn!("Could not write to log! {}", err);
}
}
},
None => {
warn!("Could not receive message!");
continue;
}
};
}
//println!("{:?}", parsed_msg.write_to_bytes());
}
}
Expand Down
15 changes: 10 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use odysseus_daemon::{
numerical::collect_data,
playback_data,
visual::{run_save_pipeline, SavePipelineOpts},
HVTransition, PublishableMessage,
HVTransition, PublishableMessage, SAVE_LOCATION,
};
use rumqttc::v5::AsyncClient;
use tokio::{
Expand Down Expand Up @@ -99,6 +99,9 @@ async fn main() {
// use that subscriber to process traces emitted after this point
tracing::subscriber::set_global_default(subscriber).expect("Could not init tracing");

// set save location
SAVE_LOCATION.get_or_init(|| cli.output_folder);

// channel to pass the mqtt data
// TODO tune buffer size
let (mqtt_sender_tx, mqtt_sender_rx) = mpsc::channel::<PublishableMessage>(1000);
Expand Down Expand Up @@ -134,7 +137,6 @@ async fn main() {
cli.mock,
mute_stat_send,
mqtt_recv_tx,
cli.output_folder.clone(),
MqttProcessorOptions {
mqtt_path: cli.mqtt_url,
},
Expand All @@ -154,7 +156,6 @@ async fn main() {
video: cli
.video_uri
.expect("Must provide video URI if video is enabled!"),
save_location: cli.output_folder,
},
));
}
Expand All @@ -165,7 +166,7 @@ async fn main() {

if cli.lockdown {
info!("Running lockdown module");
task_tracker.spawn(lockdown_runner(token.clone(), hv_stat_recv));
task_tracker.spawn(lockdown_runner(token.clone(), hv_stat_recv.clone()));
}

if cli.audible {
Expand All @@ -174,7 +175,11 @@ async fn main() {
}
if cli.logger {
info!("Running logger module");
task_tracker.spawn(logger_manager(token.clone(), mqtt_recv_rx.unwrap()));
task_tracker.spawn(logger_manager(
token.clone(),
mqtt_recv_rx.unwrap(),
hv_stat_recv.clone(),
));
}

task_tracker.close();
Expand Down
Loading

0 comments on commit 2178034

Please sign in to comment.