Skip to content

Commit

Permalink
Merge pull request #649 from LibreQoE/rust2024
Browse files Browse the repository at this point in the history
Update to Rust 2024 Edition
  • Loading branch information
rchac authored Feb 26, 2025
2 parents 9b74f0c + e5e649f commit 5c1edc6
Show file tree
Hide file tree
Showing 337 changed files with 11,667 additions and 10,268 deletions.
895 changes: 513 additions & 382 deletions src/rust/Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "lqos_rs"
version = "0.1.0"
edition = "2021"
edition = "2024"
license = "GPL-2.0-only"

[profile.release]
Expand Down Expand Up @@ -75,5 +75,5 @@ crossbeam-queue = "0.3.11"
arc-swap = "1.7.1"

# May have to change this one for ARM?
jemallocator = "0.5"
#jemallocator = "0.5"
mimalloc = "0.1.43"
3 changes: 1 addition & 2 deletions src/rust/lqos_anonymous_stats_server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
[package]
name = "lqos_anonymous_stats_server"
version = "0.1.0"
edition = "2021"
edition = "2024"
license = "GPL-2.0-only"

[dependencies]
tokio = { version = "1.25.0", features = ["full"] }
anyhow = { workspace = true }
env_logger = "0"
tracing = { workspace = true }
lqos_bus = { path = "../lqos_bus" }
serde_cbor = { workspace = true }
sqlite = "0.30.4"
Expand Down
98 changes: 57 additions & 41 deletions src/rust/lqos_anonymous_stats_server/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::{path::Path, sync::atomic::AtomicI64, time::SystemTime};
use lqos_bus::anonymous::AnonymousUsageV1;
use sqlite::{Value, State};
use sqlite::{State, Value};
use std::{path::Path, sync::atomic::AtomicI64, time::SystemTime};
const DBPATH: &str = "anonymous.sqlite";

const SETUP_QUERY: &str =
"CREATE TABLE nodes (
const SETUP_QUERY: &str = "CREATE TABLE nodes (
node_id TEXT PRIMARY KEY,
last_seen INTEGER NOT NULL
);
Expand Down Expand Up @@ -78,40 +77,35 @@ pub fn check_id() {
if let Some(val) = value {
if let Ok(n) = val.parse::<i64>() {
log::info!("Last id: {n}");
SUBMISSION_ID.store(n+1, std::sync::atomic::Ordering::Relaxed);
SUBMISSION_ID.store(n + 1, std::sync::atomic::Ordering::Relaxed);
}
}
}
}
true
}).unwrap();
})
.unwrap();
} else {
panic!("Unable to connect to database");
}
}

const INSERT_STATS: &str =
"INSERT INTO submissions VALUES (
const INSERT_STATS: &str = "INSERT INTO submissions VALUES (
:id, :date, :node_id, :ip_address, :git_hash, :using_xdp_bridge, :on_a_stick,
:total_memory, :available_memory, :kernel_version, :distro, :usable_cores,
:cpu_brand, :cpu_vendor, :cpu_frequency, :sqm, :monitor_mode, :capcity_down,
:capacity_up, :genereated_pdn_down, :generated_pdn_up, :shaped_device_count,
:net_json_len, :peak_down, :peak_up
);";

const INSERT_NIC: &str =
"INSERT INTO nics
const INSERT_NIC: &str = "INSERT INTO nics
(parent, description, product, vendor, clock, capacity)
VALUES (
:parent, :description, :product, :vendor, :clock, :capacity
);";

fn bool_to_n(x: bool) -> i64 {
if x {
1
} else {
0
}
if x { 1 } else { 0 }
}

fn get_sys_time_in_secs() -> u64 {
Expand All @@ -132,7 +126,10 @@ pub fn insert_stats_dump(stats: &AnonymousUsageV1, ip: &str) -> anyhow::Result<(
(":node_id", stats.node_id.clone().into()),
(":ip_address", ip.into()),
(":git_hash", stats.git_hash.clone().into()),
(":using_xdp_bridge", bool_to_n(stats.using_xdp_bridge).into()),
(
":using_xdp_bridge",
bool_to_n(stats.using_xdp_bridge).into(),
),
(":on_a_stick", bool_to_n(stats.on_a_stick).into()),
(":total_memory", (stats.total_memory as i64).into()),
(":available_memory", (stats.available_memory as i64).into()),
Expand All @@ -146,9 +143,18 @@ pub fn insert_stats_dump(stats: &AnonymousUsageV1, ip: &str) -> anyhow::Result<(
(":monitor_mode", bool_to_n(stats.monitor_mode).into()),
(":capcity_down", (stats.total_capacity.0 as i64).into()),
(":capacity_up", (stats.total_capacity.1 as i64).into()),
(":genereated_pdn_down", (stats.generated_pdn_capacity.0 as i64).into()),
(":generated_pdn_up", (stats.generated_pdn_capacity.1 as i64).into()),
(":shaped_device_count", (stats.shaped_device_count as i64).into()),
(
":genereated_pdn_down",
(stats.generated_pdn_capacity.0 as i64).into(),
),
(
":generated_pdn_up",
(stats.generated_pdn_capacity.1 as i64).into(),
),
(
":shaped_device_count",
(stats.shaped_device_count as i64).into(),
),
(":net_json_len", (stats.net_json_len as i64).into()),
(":peak_down", (stats.high_watermark_bps.0 as i64).into()),
(":peak_up", (stats.high_watermark_bps.0 as i64).into()),
Expand All @@ -171,9 +177,7 @@ pub fn insert_stats_dump(stats: &AnonymousUsageV1, ip: &str) -> anyhow::Result<(
// Find out if its a new host
let mut found = false;
let mut statement = cn.prepare("SELECT * FROM nodes WHERE node_id=:id")?;
statement.bind_iter::<_, (_, Value)>([
(":id", stats.node_id.clone().into()),
])?;
statement.bind_iter::<_, (_, Value)>([(":id", stats.node_id.clone().into())])?;
while let Ok(State::Row) = statement.next() {
found = true;
}
Expand All @@ -187,7 +191,8 @@ pub fn insert_stats_dump(stats: &AnonymousUsageV1, ip: &str) -> anyhow::Result<(
statement.next()?;
} else {
log::info!("New host: {}", stats.node_id);
let mut statement = cn.prepare("INSERT INTO nodes (node_id, last_seen) VALUES(:id, :date)")?;
let mut statement =
cn.prepare("INSERT INTO nodes (node_id, last_seen) VALUES(:id, :date)")?;
statement.bind_iter::<_, (_, Value)>([
(":id", stats.node_id.clone().into()),
(":date", date.into()),
Expand All @@ -210,40 +215,51 @@ pub fn dump_all_to_string() -> anyhow::Result<String> {
}
result += "\n";
true
}).unwrap();
})
.unwrap();
Ok(result)
}

pub fn count_unique_node_ids() -> anyhow::Result<u64> {
let mut result = 0;
let cn = sqlite::open(DBPATH)?;
cn.iterate("SELECT COUNT(DISTINCT node_id) AS count FROM nodes;", |pairs| {
for &(_name, value) in pairs.iter() {
if let Some(val) = value {
if let Ok(val) = val.parse::<u64>() {
result = val;
cn.iterate(
"SELECT COUNT(DISTINCT node_id) AS count FROM nodes;",
|pairs| {
for &(_name, value) in pairs.iter() {
if let Some(val) = value {
if let Ok(val) = val.parse::<u64>() {
result = val;
}
}
}
}
true
}).unwrap();
true
},
)
.unwrap();
Ok(result)
}

pub fn count_unique_node_ids_this_week() -> anyhow::Result<u64> {
let mut result = 0;
let cn = sqlite::open(DBPATH)?;
let last_week = (get_sys_time_in_secs() - 604800).to_string();
cn.iterate(format!("SELECT COUNT(DISTINCT node_id) AS count FROM nodes WHERE last_seen > {last_week};"), |pairs| {
for &(_name, value) in pairs.iter() {
if let Some(val) = value {
if let Ok(val) = val.parse::<u64>() {
result = val;
cn.iterate(
format!(
"SELECT COUNT(DISTINCT node_id) AS count FROM nodes WHERE last_seen > {last_week};"
),
|pairs| {
for &(_name, value) in pairs.iter() {
if let Some(val) = value {
if let Ok(val) = val.parse::<u64>() {
result = val;
}
}
}
}
true
}).unwrap();
true
},
)
.unwrap();
Ok(result)
}

Expand Down Expand Up @@ -293,4 +309,4 @@ pub fn bandwidth() -> anyhow::Result<u64> {
true
}).unwrap();
Ok(result)
}
}
24 changes: 11 additions & 13 deletions src/rust/lqos_anonymous_stats_server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
mod stats_server;
mod db;
mod stats_server;
mod webserver;
use tokio::spawn;


#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Start the logger
env_logger::init_from_env(
env_logger::Env::default()
.filter_or(env_logger::DEFAULT_FILTER_ENV, "warn"),
);
// Start the logger
env_logger::init_from_env(
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "warn"),
);

db::create_if_not_exist();
db::check_id();
db::create_if_not_exist();
db::check_id();

spawn(webserver::stats_viewer());
spawn(webserver::stats_viewer());

let _ = stats_server::gather_stats().await;
Ok(())
}
let _ = stats_server::gather_stats().await;
Ok(())
}
92 changes: 42 additions & 50 deletions src/rust/lqos_anonymous_stats_server/src/stats_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,63 +5,55 @@ use tokio::{io::AsyncReadExt, net::TcpListener, spawn};
use crate::db::insert_stats_dump;

pub async fn gather_stats() -> anyhow::Result<()> {
let listener = TcpListener::bind(":::9125").await?;
log::info!("Listening on :::9125");
let listener = TcpListener::bind(":::9125").await?;
log::info!("Listening on :::9125");

loop {
let (mut socket, address) = listener.accept().await?;
log::info!("Connection from {address:?}");
spawn(async move {
let mut buf = vec![0; 10240];
if let Ok(n) = socket.read(&mut buf).await {
log::info!("Received {n} bytes from {address:?}");
if let Err(e) = decode(&buf, address).await {
log::error!("Decode error from {address:?}");
log::error!("{e:?}");
}
}
});
}
loop {
let (mut socket, address) = listener.accept().await?;
log::info!("Connection from {address:?}");
spawn(async move {
let mut buf = vec![0; 10240];
if let Ok(n) = socket.read(&mut buf).await {
log::info!("Received {n} bytes from {address:?}");
if let Err(e) = decode(&buf, address).await {
log::error!("Decode error from {address:?}");
log::error!("{e:?}");
}
}
});
}
}

async fn decode(buf: &[u8], address: SocketAddr) -> anyhow::Result<()> {
const U64SIZE: usize = std::mem::size_of::<u64>();
let version_buf = &buf[0..2].try_into()?;
let version = u16::from_be_bytes(*version_buf);
let size_buf = &buf[2..2 + U64SIZE].try_into()?;
let size = u64::from_be_bytes(*size_buf);
log::info!("Received a version {version} payload of serialized size {size} from {address:?}");
const U64SIZE: usize = std::mem::size_of::<u64>();
let version_buf = &buf[0..2].try_into()?;
let version = u16::from_be_bytes(*version_buf);
let size_buf = &buf[2..2 + U64SIZE].try_into()?;
let size = u64::from_be_bytes(*size_buf);
log::info!("Received a version {version} payload of serialized size {size} from {address:?}");

match version {
1 => {
let start = 2 + U64SIZE;
let end = start + size as usize;
let payload: Result<AnonymousUsageV1, _> =
serde_cbor::from_slice(&buf[start..end]);
match payload {
Ok(payload) => store_stats_v1(&payload, address).await,
Err(e) => {
log::error!(
"Unable to deserialize statistics sent from {address:?}"
);
log::error!("{e:?}");
Err(anyhow::Error::msg("Deserialize error"))
match version {
1 => {
let start = 2 + U64SIZE;
let end = start + size as usize;
let payload: Result<AnonymousUsageV1, _> = serde_cbor::from_slice(&buf[start..end]);
match payload {
Ok(payload) => store_stats_v1(&payload, address).await,
Err(e) => {
log::error!("Unable to deserialize statistics sent from {address:?}");
log::error!("{e:?}");
Err(anyhow::Error::msg("Deserialize error"))
}
}
}
_ => {
log::error!("Unknown version of statistics: {version}, dumped {size} bytes");
Err(anyhow::Error::msg("Version error"))
}
}
}
_ => {
log::error!(
"Unknown version of statistics: {version}, dumped {size} bytes"
);
Err(anyhow::Error::msg("Version error"))
}
}
}

async fn store_stats_v1(
payload: &AnonymousUsageV1,
address: SocketAddr,
) -> anyhow::Result<()> {
insert_stats_dump(payload, &address.to_string())?;
Ok(())
async fn store_stats_v1(payload: &AnonymousUsageV1, address: SocketAddr) -> anyhow::Result<()> {
insert_stats_dump(payload, &address.to_string())?;
Ok(())
}
Loading

0 comments on commit 5c1edc6

Please sign in to comment.