-
Notifications
You must be signed in to change notification settings - Fork 30
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #9 from Juseus/main
Add sniffer example for logging CAN-bus messages
- Loading branch information
Showing
5 changed files
with
173 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# CAN Sniffer | ||
|
||
This example lets you log raw CAN messages directly to .csv file via serial | ||
port. Example consist of two separate parts, the STM32 example code and a | ||
python script. The python script connects to the serial port and outputs the | ||
CAN messages in .csv format. | ||
|
||
The Python code supports both Linux and Windows. | ||
|
||
## Prerequisites | ||
|
||
Working Python 3 installation with PIP. Tested with Python 3.10 and 3.11. | ||
|
||
To install required Python modules: | ||
|
||
``` | ||
pip install -r requirements.txt | ||
``` | ||
|
||
## How to use. | ||
|
||
Select the correct pin-configuration in the example (see the main README.md in | ||
the root of this repository), compile and upload to STM32 board. | ||
|
||
Once the uploaded example is running and CAN is connected, run the Python | ||
code: | ||
|
||
``` | ||
python3 stm32_can_to_csv.py -p PORT | ||
``` | ||
|
||
The PORT being one of COMx | /dev/ttySx | /dev/ttyACMx, depending on your | ||
platfrom and setup. | ||
|
||
To see all possible command line options: | ||
|
||
``` | ||
python3 stm32_can_to_csv.py --help | ||
``` | ||
|
||
By default, the code will output **can-log_DATE-TIME.csv** files in the current | ||
directory. Use **CTRL+C** to abort logging at any time. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
#if !defined(HAL_CAN_MODULE_ENABLED) | ||
#define HAL_CAN_MODULE_ENABLED | ||
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
pyserial |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
#include "STM32_CAN.h" | ||
//STM32_CAN Can1( CAN1, DEF ); | ||
//STM32_CAN Can1( CAN1, ALT ); | ||
STM32_CAN Can1( CAN1, ALT_2 ); | ||
|
||
static CAN_message_t CAN_RX_msg; | ||
|
||
void setup() { | ||
Serial.begin(115200); | ||
Can1.begin(); | ||
Can1.setBaudRate(500000); | ||
} | ||
|
||
void loop() { | ||
if (Can1.read(CAN_RX_msg)) { | ||
|
||
// Timestamp | ||
Serial.print(micros()); | ||
Serial.print(";"); | ||
|
||
// ID | ||
Serial.print(CAN_RX_msg.id, HEX); | ||
Serial.print(";"); | ||
|
||
// Length | ||
Serial.print(CAN_RX_msg.len); | ||
Serial.print(";"); | ||
|
||
// Data | ||
if (CAN_RX_msg.flags.remote == false) { | ||
for(int i=0; i<CAN_RX_msg.len; i++) { | ||
Serial.print("0x"); | ||
Serial.print(CAN_RX_msg.buf[i], HEX); | ||
if (i != (CAN_RX_msg.len-1)) | ||
Serial.print(";"); | ||
} | ||
Serial.println(); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import time | ||
import csv | ||
import argparse | ||
import sys | ||
import logging | ||
from signal import signal, SIGINT | ||
import serial | ||
|
||
CSV_HEADER = ["Timestamp", "ID", "Length", "Byte 0", "Byte 1", "Byte 2", "Byte 3", "Byte 4", "Byte 5", "Byte 6", "Byte 7"] | ||
TIME_FORMAT = "%Y%m%d-%H%M%S" | ||
BUFFER = [] | ||
OUTPUT_FILE = f"can-bus_{time.strftime(TIME_FORMAT)}.csv" | ||
|
||
def _sigint_handler(signal_received, frame): | ||
if len(BUFFER) > 0: | ||
write_to_csv(BUFFER, OUTPUT_FILE) | ||
sys.exit(0) | ||
|
||
def arg_parser(args): | ||
parser = argparse.ArgumentParser(description="Parses command.") | ||
parser.add_argument("-p", "--port", help="Serial port (example: COM5 or /dev/ttyS1)") | ||
parser.add_argument("-b", "--baudrate", help="Serial baudrate (default: 115200)", default=115200, type=int) | ||
parser.add_argument("--BUFFER", help="Write BUFFER length (default: 2500)", default=2500, type=int) | ||
parser.add_argument("--debug", help="Debug mode", action="store_true") | ||
|
||
# no arguments given | ||
if len(sys.argv) == 1: | ||
parser.print_help(sys.stderr) | ||
sys.exit(1) | ||
|
||
options = parser.parse_args(args) | ||
return options | ||
|
||
def write_to_csv(buffer, output_file): | ||
with open(output_file, "a", newline="", encoding="utf-8") as fd: | ||
writer = csv.writer(fd) | ||
for row in buffer: | ||
writer.writerow(row) | ||
|
||
def main(): | ||
global BUFFER | ||
|
||
# parse arguments | ||
opt = arg_parser(sys.argv[1:]) | ||
|
||
# set log level | ||
log_level = logging.INFO | ||
if opt.debug: | ||
log_level = logging.DEBUG | ||
logging.basicConfig(level=log_level, format="%(levelname)s: %(message)s") | ||
|
||
# write header | ||
write_to_csv([CSV_HEADER], OUTPUT_FILE) | ||
|
||
# initialize serial | ||
logging.info("Waiting for serial port %s", opt.port) | ||
while True: | ||
try: | ||
ser = serial.Serial(opt.port, opt.baudrate) | ||
ser.flushInput() | ||
logging.info("Serial port initialization done") | ||
break | ||
except (OSError, serial.SerialException): | ||
time.sleep(1) | ||
|
||
logging.info("Starting logging... Press CTRL+C anytime to quit") | ||
while ser.is_open: | ||
try: | ||
ser_bytes = ser.readline().decode("utf-8").strip().split(";") | ||
BUFFER.append(ser_bytes) | ||
logging.debug(ser_bytes) | ||
# write file after every X lines | ||
if len(BUFFER) == opt.BUFFER: | ||
write_to_csv(BUFFER, OUTPUT_FILE) | ||
BUFFER = [] | ||
except Exception as err: | ||
write_to_csv(BUFFER, OUTPUT_FILE) | ||
logging.error(err) | ||
break | ||
|
||
logging.info("Logging ended") | ||
|
||
if __name__ == '__main__': | ||
signal(SIGINT, _sigint_handler) | ||
main() |