Skip to content

Commit

Permalink
misc untested improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
jr1221 committed Nov 9, 2024
1 parent cfb77c2 commit dd990ed
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 52 deletions.
22 changes: 22 additions & 0 deletions src/audible.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::error::Error;

use tokio::sync::watch::Receiver;
use tokio_util::sync::CancellationToken;

/// runs the mute/unmute functionality
pub async fn audible_manager(
cancel_token: CancellationToken,
mut mute_stat_recv: Receiver<bool>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
loop {
tokio::select! {
_ = cancel_token.cancelled() => {

},
new = mute_stat_recv.changed() => {
new?;

}
}
}
}
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod audible;
pub mod lockdown;
pub mod mqtt_handler;
pub mod numerical;
Expand All @@ -13,3 +14,6 @@ pub struct PublishableMessage {

/// the topic to listen for for HV enable, 1 is on 0 is off
pub const HV_EN_TOPIC: &str = "MPU/State/TSMS";

/// the topic to listen for mute enable, 1 is on 0 is off
pub const MUTE_EN_TOPIC: &str = "WHEEL/Buttons/Mute";
62 changes: 60 additions & 2 deletions src/lockdown.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,75 @@
use std::error::Error;

use tokio::sync::watch::Receiver;
use tokio::{process::Command, sync::watch::Receiver};
use tokio_util::sync::CancellationToken;
use tracing::info;

/// Run various HV on/off lockdowns
pub async fn lockdown_runner(
cancel_token: CancellationToken,
mut hv_stat_recv: Receiver<bool>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut prev_state = false;

loop {
tokio::select! {
_ = cancel_token.cancelled() => {
hv_transition_disabled().await;
break Ok(());
},
new = hv_stat_recv.changed() => {
new?;
info!("New HV state:{}", *hv_stat_recv.borrow_and_update());
let curr_state = *hv_stat_recv.borrow_and_update();
if prev_state == curr_state { continue } else{
prev_state = curr_state;
}

info!("New HV state: {}", curr_state);
if curr_state {
hv_transition_enabled().await;
} else {
hv_transition_disabled().await;
}

}
}
}
}

/// Transition to HV on
pub async fn hv_transition_enabled() {
let mut cmd_cerb_dis = Command::new("usbip")
.args(["unbind", "--busid", "1-1.3"])
.spawn()
.unwrap();
let mut cmd_shep_dis = Command::new("usbip")
.args(["unbind", "--busid", "1-1.4"])
.spawn()
.unwrap();

cmd_cerb_dis.wait().await.unwrap();
cmd_shep_dis.wait().await.unwrap();

// if !cmd_cerb_dis.wait().await.unwrap().success() && !cmd_shep_dis.wait().await.unwrap().success() {
// info!("Failed to run USBIP command(s) to unbind");
// }
}

/// Transition to HV off
pub async fn hv_transition_disabled() {
let mut cmd_cerb_rec = Command::new("usbip")
.args(["bind", "--busid", "1-1.3"])
.spawn()
.unwrap();
let mut cmd_shep_rec = Command::new("usbip")
.args(["bind", "--busid", "1-1.4"])
.spawn()
.unwrap();

cmd_cerb_rec.wait().await.unwrap();
cmd_shep_rec.wait().await.unwrap();

// if !cmd_cerb_rec.wait().await.unwrap().success() && !cmd_shep_rec.wait().await.unwrap().success() {
// println!("Failed to run USBIP command(s) to unbind");
// }
}
33 changes: 17 additions & 16 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{

use clap::Parser;
use odysseus_daemon::{
audible::audible_manager,
lockdown::lockdown_runner,
mqtt_handler::{MqttProcessor, MqttProcessorOptions},
numerical::collect_data,
Expand All @@ -24,15 +25,19 @@ use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};
#[derive(Parser, Debug)]
#[command(version)]
struct VisualArgs {
/// Enable lockdown mode
/// Enable lockdown module
#[arg(short = 's', long, env = "TPU_TELEMETRY_LOCKDOWN_ENABLE")]
lockdown: bool,

/// Enable data mode
/// Enable audio module
#[arg(short = 'a', long, env = "TPU_TELEMETRY_AUDIBLE_ENABLE")]
audible: bool,

/// Enable data module
#[arg(short = 'd', long, env = "TPU_TELEMETRY_DATA_ENABLE")]
data: bool,

/// Enable video mode
/// Enable video module
#[arg(short = 'v', long, env = "TPU_TELEMETRY_VIDEO_ENABLE")]
video: bool,

Expand Down Expand Up @@ -85,6 +90,7 @@ async fn main() {
let (mqtt_sender_tx, mqtt_sender_rx) = mpsc::channel::<PublishableMessage>(1000);

let (hv_stat_send, hv_stat_recv) = watch::channel(false);
let (mute_stat_send, mute_stat_recv) = watch::channel(false);

let task_tracker = TaskTracker::new();
let token = CancellationToken::new();
Expand All @@ -98,23 +104,13 @@ async fn main() {
tokio::time::sleep(Duration::from_secs(1)).await;
}

// use the passed in folder
let save_location = format!(
"{}/frontcam-{}-ner24.avi",
cli.output_folder,
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
);

let video_token = token.clone();
if cli.video {
task_tracker.spawn(run_save_pipeline(
video_token,
SavePipelineOpts {
video: cli.video_uri.clone(),
save_location,
save_location: cli.output_folder,
},
));
}
Expand All @@ -124,9 +120,9 @@ async fn main() {
token.clone(),
mqtt_sender_rx,
hv_stat_send,
mute_stat_send,
MqttProcessorOptions {
mqtt_path: cli.mqtt_url,
mqtt_recv: None,
},
);
let (client, eventloop) = AsyncClient::new(opts, 600);
Expand All @@ -140,7 +136,12 @@ async fn main() {

if cli.lockdown {
info!("Running lockdown module");
task_tracker.spawn(lockdown_runner(hv_stat_recv));
task_tracker.spawn(lockdown_runner(token.clone(), hv_stat_recv));
}

if cli.audible {
info!("Running audio module");
task_tracker.spawn(audible_manager(token.clone(), mute_stat_recv));
}

task_tracker.close();
Expand Down
36 changes: 15 additions & 21 deletions src/mqtt_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use rumqttc::v5::{
mqttbytes::{v5::Packet, QoS},
AsyncClient, Event, EventLoop, MqttOptions,
};
use tokio::sync::{mpsc::Receiver, watch::Sender, RwLock};
use tokio::sync::{mpsc::Receiver, watch::Sender};
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};

use crate::{serverdata, PublishableMessage, HV_EN_TOPIC};
use crate::{serverdata, PublishableMessage, HV_EN_TOPIC, MUTE_EN_TOPIC};

/// The chief processor of incoming mqtt data, this handles
/// - mqtt state
Expand All @@ -21,16 +21,14 @@ use crate::{serverdata, PublishableMessage, HV_EN_TOPIC};
pub struct MqttProcessor {
cancel_token: CancellationToken,
mqtt_sender_rx: Receiver<PublishableMessage>,
mqtt_recv: Option<(String, Arc<RwLock<f32>>)>,
hv_stat_send: Sender<bool>,
mute_stat_send: Sender<bool>,
}

/// processor options, these are static immutable settings
pub struct MqttProcessorOptions {
/// URI of the mqtt server
pub mqtt_path: String,
/// MQTT topic and place to put data, or none
pub mqtt_recv: Option<(String, Arc<RwLock<f32>>)>,
}

impl MqttProcessor {
Expand All @@ -39,6 +37,7 @@ impl MqttProcessor {
cancel_token: CancellationToken,
mqtt_sender_rx: Receiver<PublishableMessage>,
hv_stat_send: Sender<bool>,
mute_stat_send: Sender<bool>,
opts: MqttProcessorOptions,
) -> (MqttProcessor, MqttOptions) {
// create the mqtt client and configure it
Expand Down Expand Up @@ -69,8 +68,8 @@ impl MqttProcessor {
MqttProcessor {
cancel_token,
mqtt_sender_rx,
mqtt_recv: opts.mqtt_recv,
hv_stat_send,
mute_stat_send,
},
mqtt_opts,
)
Expand All @@ -81,15 +80,6 @@ impl MqttProcessor {
/// * `client` - The async mqttt v5 client to use for subscriptions
pub async fn process_mqtt(mut self, client: Arc<AsyncClient>, mut eventloop: EventLoop) {
debug!("Subscribing to siren with inputted topic");
if self.mqtt_recv.as_ref().is_some() {
client
.subscribe(
self.mqtt_recv.as_ref().unwrap().0.clone(),
rumqttc::v5::mqttbytes::QoS::ExactlyOnce,
)
.await
.expect("Could not subscribe to Siren");
}
client
.subscribe(HV_EN_TOPIC, rumqttc::v5::mqttbytes::QoS::ExactlyOnce)
.await
Expand All @@ -111,14 +101,9 @@ impl MqttProcessor {
warn!("Could not parse topic, topic: {:?}", msg.topic);
continue;
};
if let Some(ref mqtt_recv) = self.mqtt_recv {
let mut d = mqtt_recv.1.write().await;
*d = *res.values.first().unwrap_or(&0f32);
}

let val = *res.values.first().unwrap_or(&-1f32) as u8;
match topic {
HV_EN_TOPIC => {
let val = *res.values.first().unwrap_or(&-1f32) as u8;
if val == 1 {
self.hv_stat_send.send(true).expect("HV Stat Channel Closed");
} else if val == 0 {
Expand All @@ -127,6 +112,15 @@ impl MqttProcessor {
warn!("Received bad HV message!");
}
},
MUTE_EN_TOPIC => {
if val == 1 {
self.mute_stat_send.send(true).expect("Mute Stat Channel Closed");
} else if val == 0 {
self.mute_stat_send.send(false).expect("Mute Stat Channel Closed");
} else {
warn!("Received bad mute message!");
}
},
_ => {
warn!("Unknown topic received: {}", topic);
}
Expand Down
39 changes: 26 additions & 13 deletions src/visual.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{error::Error, process::Stdio};
use std::{
error::Error,
process::Stdio,
time::{SystemTime, UNIX_EPOCH},
};

use tokio::process::Command;
use tokio_util::sync::CancellationToken;
Expand All @@ -16,6 +20,17 @@ pub async fn run_save_pipeline(
) -> Result<(), Box<dyn Error + Send + Sync>> {
// ffmpeg -f video4linux2 -input_format mjpeg -s 1280x720 -i /dev/video0 -vf "drawtext=fontfile=FreeSerif.tff: \
//text='%{localtime\:%T}': [email protected]: x=7: y=700" -vcodec libx264 -preset veryfast -f mp4 -pix_fmt yuv420p -y output.mp4

// use the passed in folder
let save_location = format!(
"{}/frontcam-{}-ner24.avi",
vid_opts.save_location,
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
);

info!("Creating and launching ffmpeg...");
let mut res = Command::new("ffmpeg").args([
"-f",
Expand All @@ -37,20 +52,18 @@ pub async fn run_save_pipeline(
"-pix_fmt",
"yuv420p",
"-y",
&vid_opts.save_location
&save_location
]).stdin(Stdio::null()).spawn()?;

loop {
tokio::select! {
_ = cancel_token.cancelled() => {
info!("Ffmpeg canceled");
res.wait().await.unwrap();
return Ok(())
},
_ = res.wait() => {
warn!("Ffmpeg ended early!");
return Ok(())
}
tokio::select! {
_ = cancel_token.cancelled() => {
info!("Ffmpeg canceled");
res.wait().await.unwrap();
Ok(())
},
_ = res.wait() => {
warn!("Ffmpeg ended early!");
Ok(())
}
}
}

0 comments on commit dd990ed

Please sign in to comment.