Skip to content

Commit

Permalink
[ISO-TP] implement Separation Time (STmin) (#35)
Browse files Browse the repository at this point in the history
* parse out flow control config

* wait between CF

* better asynccan debugging

* add isotp example that triggers flow control send

* remove debug print

* add stmin override
  • Loading branch information
pd0wm authored Mar 15, 2024
1 parent 276aa86 commit fb650f5
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 59 deletions.
5 changes: 5 additions & 0 deletions examples/isotp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ async fn main() {
isotp.send(&long_request).await.unwrap();
let response = stream.next().await.unwrap().unwrap();
println!("RX: {}", hex::encode(response));

let long_response = [0x22, 0xf1, 0x81];
isotp.send(&long_response).await.unwrap();
let response = stream.next().await.unwrap().unwrap();
println!("RX: {}", hex::encode(response));
}
10 changes: 10 additions & 0 deletions src/async_can.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ use crate::can::Identifier;
use async_stream::stream;
use futures_core::stream::Stream;
use tokio::sync::{broadcast, mpsc, oneshot};
use tracing::debug;

const CAN_TX_BUFFER_SIZE: usize = 128;
const CAN_RX_BUFFER_SIZE: usize = 1024;
const DEBUG: bool = false;

type BusIdentifier = (u8, Identifier);
type FrameCallback = (Frame, oneshot::Sender<()>);
Expand All @@ -27,6 +29,10 @@ fn process<T: CanAdapter>(
while shutdown_receiver.try_recv().is_err() {
let frames: Vec<Frame> = adapter.recv().unwrap();
for frame in frames {
if DEBUG {
debug! {"RX {:?}", frame};
}

// Wake up sender
if frame.loopback {
let callback = callbacks
Expand Down Expand Up @@ -60,6 +66,10 @@ fn process<T: CanAdapter>(
.or_insert_with(VecDeque::new)
.push_back((loopback_frame, callback));

if DEBUG {
debug! {"TX {:?}", frame};
}

buffer.push(frame);
}
if !buffer.is_empty() {
Expand Down
58 changes: 40 additions & 18 deletions src/can.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//! Generic CAN types and traits
use std::fmt;

/// Identifier for a CAN frame
#[derive(Debug, Copy, Clone, PartialOrd, Eq, PartialEq, Hash)]
#[derive(Copy, Clone, PartialOrd, Eq, PartialEq, Hash)]
pub enum Identifier {
Standard(u32),
Extended(u32),
Expand All @@ -19,8 +21,36 @@ impl Identifier {
}
}

impl fmt::Debug for Identifier {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Identifier::Extended(id) => write!(f, "0x{:08x}", id),
Identifier::Standard(id) => write!(f, "0x{:03x}", id),
}
}
}

impl From<u32> for Identifier {
fn from(id: u32) -> Identifier {
if id <= 0x7ff {
Identifier::Standard(id)
} else {
Identifier::Extended(id)
}
}
}

impl From<Identifier> for u32 {
fn from(val: Identifier) -> u32 {
match val {
Identifier::Standard(id) => id,
Identifier::Extended(id) => id,
}
}
}

/// A CAN frame
#[derive(Debug, Clone, PartialEq)]
#[derive(Clone, PartialEq)]
pub struct Frame {
/// The bus index for adapters supporting multiple CAN busses
pub bus: u8,
Expand All @@ -45,22 +75,14 @@ impl Frame {
}
}

impl From<u32> for Identifier {
fn from(id: u32) -> Identifier {
if id <= 0x7ff {
Identifier::Standard(id)
} else {
Identifier::Extended(id)
}
}
}

impl From<Identifier> for u32 {
fn from(val: Identifier) -> u32 {
match val {
Identifier::Standard(id) => id,
Identifier::Extended(id) => id,
}
impl fmt::Debug for Frame {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Frame")
.field("bus", &self.bus)
.field("id", &self.id)
.field("data", &hex::encode(&self.data))
.field("loopback", &self.loopback)
.finish()
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ pub enum Error {
#[error("Timeout")]
Timeout,
#[error(transparent)]
IsoTPError(crate::isotp::error::Error),
IsoTPError(#[from] crate::isotp::error::Error),
#[error(transparent)]
LibUsbError(#[from] rusb::Error),
#[error(transparent)]
PandaError(crate::panda::error::Error),
PandaError(#[from] crate::panda::error::Error),
#[error(transparent)]
UDSError(crate::uds::error::Error),
UDSError(#[from] crate::uds::error::Error),
}

impl From<tokio_stream::Elapsed> for Error {
Expand Down
26 changes: 12 additions & 14 deletions src/isotp/constants.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
#[derive(Debug, PartialEq, Copy, Clone)]
use strum_macros::FromRepr;

pub static FRAME_TYPE_MASK: u8 = 0xf0;
pub static FLOW_SATUS_MASK: u8 = 0x0f;

#[derive(Debug, PartialEq, Copy, Clone, FromRepr)]
#[repr(u8)]
pub enum FrameType {
Single = 0x00,
First = 0x10,
Consecutive = 0x20,
FlowControl = 0x30,
Unknown = 0xff,
}

pub static FRAME_TYPE_MASK: u8 = 0xf0;

impl From<u8> for FrameType {
fn from(val: u8) -> FrameType {
match val {
0x00 => FrameType::Single,
0x10 => FrameType::First,
0x20 => FrameType::Consecutive,
0x30 => FrameType::FlowControl,
_ => FrameType::Unknown,
}
}
#[derive(Debug, PartialEq, Copy, Clone, FromRepr)]
#[repr(u8)]
pub enum FlowStatus {
ContinueToSend = 0x0,
Wait = 0x1,
Overflow = 0x2,
}
2 changes: 2 additions & 0 deletions src/isotp/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub enum Error {
DataTooLarge,
#[error("Flow Control")]
FlowControl,
#[error("Overflow")]
Overflow,
#[error("Out Of Order")]
OutOfOrder,
#[error("Unknown Frame Type")]
Expand Down
88 changes: 64 additions & 24 deletions src/isotp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@
pub mod constants;
pub mod error;
pub mod types;

use crate::async_can::AsyncCanAdapter;
use crate::can::Frame;
use crate::can::Identifier;
use crate::error::Error;
use crate::isotp::constants::FlowStatus;
use crate::isotp::constants::FLOW_SATUS_MASK;
use crate::isotp::constants::{FrameType, FRAME_TYPE_MASK};

use async_stream::stream;
use futures_core::stream::Stream;
use tokio_stream::{StreamExt, Timeout};
use tracing::debug;

use self::types::FlowControlConfig;

const DEFAULT_TIMEOUT_MS: u64 = 100;

/// Configuring passed to the IsoTPAdapter.
Expand All @@ -43,6 +48,8 @@ pub struct IsoTPConfig {
pub padding: Option<u8>,
/// Max timeout for receiving a frame
pub timeout: std::time::Duration,
/// Override for Seperation Time (STmin) for transmitted frames
pub separation_time_min: Option<std::time::Duration>,
}

impl IsoTPConfig {
Expand Down Expand Up @@ -78,6 +85,7 @@ impl IsoTPConfig {
tx_dl: 8,
padding: Some(0xaa),
timeout: std::time::Duration::from_millis(DEFAULT_TIMEOUT_MS),
separation_time_min: None,
}
}
}
Expand Down Expand Up @@ -144,6 +152,27 @@ impl<'a> IsoTPAdapter<'a> {
self.adapter.send(&frame).await;
}

fn receive_flow_control(&self, frame: &Frame) -> Result<FlowControlConfig, Error> {
// Check if Flow Control
if FrameType::from_repr(frame.data[0] & FRAME_TYPE_MASK) != Some(FrameType::FlowControl) {
return Err(crate::isotp::error::Error::FlowControl.into());
};

// Check Flow Status
match FlowStatus::from_repr(frame.data[0] & FLOW_SATUS_MASK) {
Some(FlowStatus::ContinueToSend) => {} // Ok
Some(FlowStatus::Wait) => unimplemented!("Wait flow control not implemented"),
Some(FlowStatus::Overflow) => return Err(crate::isotp::error::Error::Overflow.into()),
None => return Err(crate::isotp::error::Error::MalformedFrame.into()),
};

// Parse block size and separation time
let config = types::FlowControlConfig::try_from(frame)?;

debug!("RX FC, {:?} data {}", config, hex::encode(&frame.data));
Ok(config)
}

async fn send_multiple(&self, data: &[u8]) -> Result<(), Error> {
// Stream for receiving flow control
let stream = self
Expand All @@ -154,14 +183,26 @@ impl<'a> IsoTPAdapter<'a> {

self.send_first_frame(data).await;
let frame = stream.next().await.unwrap()?;
if frame.data[0] & FRAME_TYPE_MASK != FrameType::FlowControl as u8 {
return Err(Error::IsoTPError(crate::isotp::error::Error::FlowControl));
};
let fc_config = self.receive_flow_control(&frame)?;

debug!("RX FC, data {}", hex::encode(&frame.data));

// Check for separation time override
let st_min = match self.config.separation_time_min {
Some(st_min) => st_min,
None => fc_config.separation_time_min,
};

let chunks = data[self.config.tx_dl - 2..].chunks(self.config.tx_dl - 1);
for (idx, chunk) in chunks.enumerate() {
let mut it = chunks.enumerate().peekable();
while let Some((idx, chunk)) = it.next() {
self.send_consecutive_frame(chunk, idx).await;

// Sleep for separation time between frames
let last = it.peek().is_none();
if !last {
tokio::time::sleep(st_min).await;
}
}

Ok(())
Expand All @@ -176,31 +217,32 @@ impl<'a> IsoTPAdapter<'a> {
} else if data.len() <= 4095 {
self.send_multiple(data).await?;
} else {
return Err(Error::IsoTPError(crate::isotp::error::Error::DataTooLarge));
return Err(crate::isotp::error::Error::DataTooLarge.into());
}

Ok(())
}
async fn recv_single_frame(&self, frame: Frame) -> Result<Vec<u8>, Error> {
async fn recv_single_frame(&self, frame: &Frame) -> Result<Vec<u8>, Error> {
let len = (frame.data[0] & 0xF) as usize;
if len == 0 {
// unimplemented!("CAN FD escape sequence for single frame not supported");
return Err(Error::IsoTPError(
crate::isotp::error::Error::MalformedFrame,
));
unimplemented!("CAN FD escape sequence for single frame not supported");
}

debug!("RX SF, length: {} data {}", len, hex::encode(&frame.data));

Ok(frame.data[1..len + 1].to_vec())
}

async fn recv_first_frame(&self, frame: Frame, buf: &mut Vec<u8>) -> Result<usize, Error> {
async fn recv_first_frame(&self, frame: &Frame, buf: &mut Vec<u8>) -> Result<usize, Error> {
let b0 = frame.data[0] as u16;
let b1 = frame.data[1] as u16;
let len = ((b0 << 8 | b1) & 0xFFF) as usize;

debug!("RX FF, length: {}, data {}", len, hex::encode(&frame.data));
if len == 0 {
unimplemented!("CAN FD escape sequence for first frame not supported");
}


buf.extend(&frame.data[2..]);

Expand All @@ -218,7 +260,7 @@ impl<'a> IsoTPAdapter<'a> {

async fn recv_consecutive_frame(
&self,
frame: Frame,
frame: &Frame,
buf: &mut Vec<u8>,
len: usize,
idx: u8,
Expand All @@ -236,7 +278,7 @@ impl<'a> IsoTPAdapter<'a> {
);

if msg_idx != idx {
return Err(Error::IsoTPError(crate::isotp::error::Error::OutOfOrder));
return Err(crate::isotp::error::Error::OutOfOrder.into());
}

let new_idx = if idx == 0xF { 0 } else { idx + 1 };
Expand All @@ -254,21 +296,21 @@ impl<'a> IsoTPAdapter<'a> {

while let Some(frame) = stream.next().await {
let frame = frame?;
match (frame.data[0] & FRAME_TYPE_MASK).into() {
FrameType::Single => {
return Ok(self.recv_single_frame(frame).await?);
match FrameType::from_repr(frame.data[0] & FRAME_TYPE_MASK) {
Some(FrameType::Single) => {
return self.recv_single_frame(&frame).await;
}
FrameType::First => {
Some(FrameType::First) => {
// If we already received a first frame, something went wrong
if len.is_some() {
return Err(Error::IsoTPError(crate::isotp::error::Error::OutOfOrder));
}
len = Some(self.recv_first_frame(frame, &mut buf).await?);
len = Some(self.recv_first_frame(&frame, &mut buf).await?);
}
FrameType::Consecutive => {
Some(FrameType::Consecutive) => {
if let Some(len) = len {
idx = self
.recv_consecutive_frame(frame, &mut buf, len, idx)
.recv_consecutive_frame(&frame, &mut buf, len, idx)
.await?;
if buf.len() >= len {
return Ok(buf);
Expand All @@ -277,11 +319,9 @@ impl<'a> IsoTPAdapter<'a> {
return Err(Error::IsoTPError(crate::isotp::error::Error::OutOfOrder));
}
}
FrameType::FlowControl => {} // Ignore flow control frames, these are from a simultaneous transmission
Some(FrameType::FlowControl) => {} // Ignore flow control frames, these are from a simultaneous transmission
_ => {
return Err(Error::IsoTPError(
crate::isotp::error::Error::UnknownFrameType,
));
return Err(crate::isotp::error::Error::UnknownFrameType.into());
}
};
}
Expand Down
Loading

0 comments on commit fb650f5

Please sign in to comment.