Skip to content

Commit

Permalink
Bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
RainerZ committed Sep 7, 2024
1 parent 5bd67ca commit e4dfd3d
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 64 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ tmp.a2l
.DS_Store
/a2lfile.txt
*.bin
/tokio_demo.a2l
3 changes: 3 additions & 0 deletions examples/tokio_demo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ chrono = "0.4.38"
lazy_static = "1.4.0"
tokio = { version = "1.37.0", features = ["full"] }

serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

xcp = { path = "../../", features = ["auto_reg","json"] }
xcp_type_description = { path = "../../xcp_type_description/"}
xcp_type_description_derive = { path = "../../xcp_type_description/xcp_type_description_derive/" }
Expand Down
150 changes: 138 additions & 12 deletions examples/tokio_demo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,67 @@ mod xcp_server;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};

use core::f64::consts::PI;
use serde::{Deserialize, Serialize};
use std::error::Error;
use tokio::{join, time::sleep};

use xcp::*;
use xcp_type_description::prelude::*;

//-----------------------------------------------------------------------------
// Demo calibration parameters (static)

struct CalPage {
run: bool,
}

static CAL_PAGE: once_cell::sync::OnceCell<CalPage> = once_cell::sync::OnceCell::with_value(CalPage { run: true });

struct CalPage0 {
task1_cycle_time_us: u32, // Cycle time of task1 in microseconds
task2_cycle_time_us: u32, // Cycle time of task2 in microseconds
task_count: u16, // Number of tasks
}

static CAL_PAGE0: once_cell::sync::OnceCell<CalPage0> = once_cell::sync::OnceCell::with_value(CalPage0 {
task1_cycle_time_us: 100000, // 100ms
task2_cycle_time_us: 1000, // 1ms
task_count: 10,
});

//-----------------------------------------------------------------------------
// Demo calibration parameters (dynamic)

// Define a struct with calibration parameters
#[derive(Debug, Clone, Copy, Serialize, Deserialize, XcpTypeDescription)]
struct CalPage1 {
#[type_description(comment = "Amplitude of the sine signal")]
#[type_description(unit = "Volt")]
#[type_description(min = "0")]
#[type_description(max = "500")]
ampl: f64,

#[type_description(comment = "Period of the sine signal")]
#[type_description(unit = "s")]
#[type_description(min = "0.001")]
#[type_description(max = "10")]
period: f64,

#[type_description(comment = "Counter maximum value")]
#[type_description(min = "0")]
#[type_description(max = "255")]
counter_max: u32,
}

// Default calibration values
// This will be the FLASH page in the calibration memory segment
const CAL_PAGE1: CalPage1 = CalPage1 {
ampl: 100.0,
period: 5.0,
counter_max: 100,
};

//-----------------------------------------------------------------------------
// Asynchronous task, measures index, sleeps 100ms, measures -index and ends
// Demonstrates multi instance measurement
// There will be an event and an instance of index for each worker thread tokio uses
Expand All @@ -22,38 +80,106 @@ async fn task(task_index: u16) {
//daq_register!(index, event, "Task index", "");
// event.trigger();

sleep(tokio::time::Duration::from_millis(2)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
index = -index;

//event.trigger();

trace!("task {} end", index);
}

//-----------------------------------------------------------------------------
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
println!("xcp-lite tokio demo");

env_logger::Builder::new().filter_level(log::LevelFilter::Trace).init();
// Initialize logger
env_logger::Builder::new().filter_level(log::LevelFilter::Debug).init();

// Start tokio XCP server
let (rx_task, tx_task) = xcp_server::start_async_xcp_server("127.0.0.1:5555".to_string()).await?;
let xcp = Xcp::get();

// Create and register a static calibration parameter set
let calpage = CAL_PAGE.get().unwrap();
cal_register_static!(calpage.run, "stop maintask");
let calpage0 = CAL_PAGE0.get().unwrap();
cal_register_static!(calpage0.task1_cycle_time_us, "task1 cycle time", "us");
cal_register_static!(calpage0.task2_cycle_time_us, "task2 cycle time", "us");
cal_register_static!(calpage0.task_count, "task count");

// Create and register a calibration parameter set "calseg"
// This will define a MEMORY_SEGMENT named "calseg" in A2L
// Calibration segments have 2 pages, a constant default "FLASH" page and a mutable "RAM" page
// FLASH or RAM can be switched during runtime (XCP set_cal_page), saved to json (XCP freeze) freeze, reinitialized from FLASH (XCP copy_cal_page)
// The RAM page can be reloaded from a json file (load_json==true)
// If A2L is enabled (enable_a2l), the A2L description will be generated and provided for upload by CANape
let calseg = xcp.create_calseg(
"CalPage1", // name of the calibration segment and the .json file
&CAL_PAGE1, // default calibration values
true, // load RAM page from file "cal_seg".json
);

// Mainloop
trace!("Start");
let start_time = tokio::time::Instant::now();

let _xcp = xcp_server::start_server("127.0.0.1:5555".to_string()).await?;
// Measurement variable
let mut counter: u32 = 0;
let mut channel_1: f64 = 0.0;

trace!("Start");
// Create a measurement event with a unique name "task"
// This will apear as measurement mode in the CANape measurement configuration
let event = daq_create_event!("task");

// Register local variables "counter" and "channel_1" and associate them to event "task"
daq_register!(counter, event);
daq_register!(channel_1, event, "sine wave signal", "Volt");

// Main task loop
loop {
sleep(tokio::time::Duration::from_secs(1)).await;
// Stop
if !calpage.run {
break;
}

let mut tasks = Vec::new();
// Sleep for a calibratable amount of microseconds
tokio::time::sleep(tokio::time::Duration::from_micros(calpage0.task1_cycle_time_us as u64)).await;

const N: u16 = 100;
for i in 1..=N {
// Start a number of asynchronous tasks and wait for them to finish
let mut tasks = Vec::new();
for i in 1..=calpage0.task_count {
tasks.push(tokio::spawn(task(i)));
}
for t in tasks {
let _ = join!(t);
let _ = tokio::join!(t);
}

// A saw tooth counter with max from a calibration parameter
counter += 1;
if counter > calseg.counter_max {
counter = 0
}

// A sine signal with amplitude and period from calibration parameters
let time = start_time.elapsed().as_micros() as f64 * 0.000001; // s
channel_1 = calseg.ampl * (PI * time / calseg.period).sin();
let _channel_2 = channel_1;

// Triger the measurement event "task"
// The measurement event timestamp is taken here and captured data is sent to CANape
event.trigger();

// Synchronize calibration operations, if there are any
// All calibration (mutation of calseg) actions (download, page switch, freeze, init) on segment "calseg" happen here
calseg.sync();
}

//Xcp::get().write_a2l();
//Xcp::stop_server();
info!("Stop");

let _ = tokio::join!(rx_task);
let _ = tokio::join!(tx_task);

xcp.stop_server();
Ok(())
}
80 changes: 53 additions & 27 deletions examples/tokio_demo/src/xcp_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,77 @@ use std::error::Error;
use std::io;
use std::net::SocketAddr;

//use once_cell::sync::OnceCell;
use log::info;

use once_cell::sync::OnceCell;

//use tokio::join;
use tokio::net::UdpSocket;

use xcp::*;

#[derive(Debug)]
struct Server {
socket: UdpSocket,
buf: Vec<u8>,
xcp: &'static Xcp,
client: Option<(usize, SocketAddr)>,
}

//static SERVER: OnceCell<Server> = OnceCell::new();
#[derive(Debug)]
struct Client {
client: SocketAddr,
}

static ASYNC_XCP_SERVER: OnceCell<Server> = OnceCell::new();
static ASYNC_XCP_CLIENT: OnceCell<Client> = OnceCell::new();

async fn rx_task() -> Result<(), io::Error> {
let server = ASYNC_XCP_SERVER.get().unwrap();
let xcp = Xcp::get();
let mut buf = vec![0u8; 1024];
loop {
let res: (usize, SocketAddr) = server.socket.recv_from(&mut buf).await?;
info!("rx_task: recv {} bytes from {}, buf_len={}", res.0, res.1, buf.len());

if let Some(c) = ASYNC_XCP_CLIENT.get() {
assert_eq!(c.client, res.1);
} else {
ASYNC_XCP_CLIENT.set(Client { client: res.1 }).unwrap();
}

xcp.tl_command(&buf);
}
}

impl Server {
async fn run(mut self) -> Result<(), io::Error> {
loop {
let client: (usize, SocketAddr) = self.socket.recv_from(&mut self.buf).await?;
self.client = Some(client); // @@@@ check client changed
self.xcp.tl_command(&self.buf);
async fn tx_task() -> Result<(), io::Error> {
let server = ASYNC_XCP_SERVER.get().unwrap();

if let Some(buf) = self.xcp.tl_transmit_queue_peek() {
self.socket.send_to(buf, &self.client.unwrap().1).await?;
self.xcp.tl_transmit_queue_next();
}
let xcp = Xcp::get();
loop {
while let Some(buf) = xcp.tl_transmit_queue_peek() {
let client = ASYNC_XCP_CLIENT.get().unwrap();
server.socket.send_to(buf, &client.client).await?;
xcp.tl_transmit_queue_next();
info!("Sent {} bytes to {}", buf.len(), client.client);
}

tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
}
}

pub async fn start_server(addr: String) -> Result<&'static Xcp, Box<dyn Error>> {
pub async fn start_async_xcp_server(addr: String) -> Result<(tokio::task::JoinHandle<Result<(), io::Error>>, tokio::task::JoinHandle<Result<(), io::Error>>), Box<dyn Error>> {
let socket = UdpSocket::bind(&addr).await?;
println!("Bind to {}", socket.local_addr()?);

// Initialize the XCP driver transport layer only, not the server
let xcp = XcpBuilder::new("tokio_demo").set_log_level(XcpLogLevel::Debug).enable_a2l(true).start_protocol_layer().unwrap();
let _xcp = XcpBuilder::new("tokio_demo").set_log_level(XcpLogLevel::Debug).enable_a2l(true).start_protocol_layer().unwrap();

// Start the tokio server
let server = Server {
socket,
buf: vec![0; 1024],
xcp,
client: None,
};

// Run the server tasks
server.run().await?;
let server = Server { socket };
if ASYNC_XCP_SERVER.get().is_some() {
return Err("Server already started".into());
}
ASYNC_XCP_SERVER.set(server).unwrap();
let rx_task = tokio::spawn(rx_task());
let tx_task: tokio::task::JoinHandle<Result<(), io::Error>> = tokio::spawn(tx_task());

Ok(xcp)
Ok((rx_task, tx_task))
}
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,15 @@ fn main() {

// Trying to call `.init()` again would panic, because the StaticCell is already initialized.
// SOME_INT.init(42);

// Create calibration parameter sets
// Calibration segments have "static" lifetime, the Xcp singleton holds a smart pointer clone to each
// When a calibration segment is dropped by the application and sync is no longer called, the XCP tool will get a timeout when attempting to access it
// Calibration segments have 2 pages, a constant default "FLASH" page and a mutable "RAM" page
// FLASH or RAM can be switched during runtime (XCP set_cal_page), saved to json (XCP freeze), reinitialized from default FLASH page (XCP copy_cal_page)
// The initial RAM page can be loaded from a json file (load_json=true) or set to the default FLASH page (load_json=false)

// Create a alibration segment wrapper for CAL_PAGE, add fields manually to registry
// Create a calibration segment wrapper for CAL_PAGE, add fields manually to registry
let calseg = xcp.add_calseg(
"CalPage", // name of the calibration segment and the .json file
&CAL_PAGE, // default calibration values with static lifetime, trait bound from CalPageTrait must be possible
Expand Down
11 changes: 5 additions & 6 deletions xcp_client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,22 +222,21 @@ async fn main() {
let start_time = tokio::time::Instant::now();
xcp_client.start_measurement().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
xcp_client.stop_measurement().await.unwrap();
xcp_client.stop_measurement().await.unwrap_or_else(|e| error!("Stop measurement failed: {}", e));
let elapsed_time = start_time.elapsed().as_micros();
let event_count = daq_decoder.lock().unwrap().event_count;
info!("Measurement done, {} events, {:.0} event/s", event_count, event_count as f64 * 1_000_000.0 / elapsed_time as f64);
assert_ne!(event_count, 0);

// Stop demo task
// Create a calibration object for CalPage1.counter_max

if let Ok(run) = xcp_client.create_calibration_object("CalPage.run").await {
// Create a calibration object for CalPage.run
if let Ok(run) = xcp_client.create_calibration_object("calpage.run").await {
let v = xcp_client.get_value_u64(run);
info!("CalPage.run = {}", v);
info!("calpage.run = {}", v);
assert_eq!(v, 1);
xcp_client.set_value_u64(run, 0).await.unwrap();
} else {
warn!("CalPage.run not found");
warn!("calpage.run not found");
}

// Disconnect
Expand Down
Loading

0 comments on commit e4dfd3d

Please sign in to comment.