Skip to content

Commit

Permalink
fix(jx): improve reliability of smartcard detection and status stream (
Browse files Browse the repository at this point in the history
…#97)

Replaces `try_stream!` with a broadcast stream. This means that no matter how many requests we get for the status stream endpoint we only collect the data for it in one place. Previously, the loop inside the `try_stream!` would run even after the connection was closed, resulting in excess resource usage.
  • Loading branch information
eventualbuddha authored Mar 28, 2024
1 parent faa2cd7 commit f9f6dbc
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 146 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dioxus-router = "0.4.1"
dioxus-web = { version = "0.4" }
dotenvy = "0.15.7"
env_logger = "0.10.0"
futures = "0.3.28"
futures-core = "0.3.28"
hex = "0.4.3"
hmac-sha256 = "1.1.7"
Expand Down Expand Up @@ -64,7 +65,7 @@ tokio = { version = "1.29.1", default-features = false, features = [
"sync",
"macros",
] }
tokio-stream = "0.1.14"
tokio-stream = { version = "0.1.14", features = ["sync"] }
tower-http = { version = "0.4.3", features = ["fs"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
Expand Down
1 change: 1 addition & 0 deletions apps/cacvote-jx-terminal/backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ clap = { workspace = true }
color-eyre = { workspace = true }
dotenvy = { workspace = true }
futures-core = { workspace = true }
futures = { workspace = true }
openssl = { workspace = true }
pcsc = { workspace = true }
regex = { workspace = true }
Expand Down
109 changes: 78 additions & 31 deletions apps/cacvote-jx-terminal/backend/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::convert::Infallible;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;

use async_stream::try_stream;
use auth_rs::card_details::CardDetailsWithAuthInfo;
use axum::body::Bytes;
use axum::response::sse::{Event, KeepAlive};
Expand All @@ -16,22 +15,27 @@ use axum::routing::post;
use axum::Json;
use axum::{extract::DefaultBodyLimit, routing::get, Router};
use axum::{extract::State, http::StatusCode, response::IntoResponse};
use futures_core::Stream;
use futures::stream::Stream;
use serde_json::json;
use sqlx::PgPool;
use tokio::time::sleep;
use tokio_stream::StreamExt;
use tower_http::services::{ServeDir, ServeFile};
use tower_http::trace::TraceLayer;
use tracing::Level;
use types_rs::cacvote::{AuthStatus, Election, Payload, SessionData, SignedObject};
use types_rs::cacvote::{Election, Payload, SessionData, SignedObject};
use types_rs::election::ElectionDefinition;
use uuid::Uuid;

use crate::config::{Config, MAX_REQUEST_SIZE};
use crate::{db, smartcard};
use tokio::sync::broadcast;

// type AppState = (Config, PgPool, smartcard::StatusGetter);
type AppState = (Config, PgPool, smartcard::DynSmartcard);
#[derive(Clone)]
struct AppState {
pool: PgPool,
smartcard: smartcard::DynSmartcard,
broadcast_tx: broadcast::Sender<SessionData>,
}

/// Prepares the application with all the routes. Run the application with
/// `app::run(…)` once you have it.
Expand All @@ -50,14 +54,56 @@ pub(crate) fn setup(pool: PgPool, config: Config, smartcard: smartcard::DynSmart
}
};

let (broadcast_tx, _) = broadcast::channel(1);

tokio::spawn({
let jurisdiction_code = config.jurisdiction_code.clone();
let pool = pool.clone();
let smartcard = smartcard.clone();
let broadcast_tx = broadcast_tx.clone();
async move {
loop {
let mut connection = pool.acquire().await.unwrap();

let session_data = match smartcard.get_card_details() {
Some(CardDetailsWithAuthInfo { card_details, .. })
if card_details.jurisdiction_code() == jurisdiction_code =>
{
let elections = db::get_elections(&mut connection).await.unwrap();
SessionData::Authenticated {
jurisdiction_code: jurisdiction_code.clone(),
elections: elections
.into_iter()
.map(|e| e.election_definition)
.collect(),
}
}
Some(_) => SessionData::Unauthenticated {
has_smartcard: true,
},
None => SessionData::Unauthenticated {
has_smartcard: false,
},
};

let _ = broadcast_tx.send(session_data);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
});

router
.route("/api/status", get(get_status))
.route("/api/status-stream", get(get_status_stream))
.route("/api/elections", get(get_elections))
.route("/api/elections", post(create_election))
.layer(DefaultBodyLimit::max(MAX_REQUEST_SIZE))
.layer(TraceLayer::new_for_http())
.with_state((config, pool, smartcard))
.with_state(AppState {
pool,
smartcard,
broadcast_tx,
})
}

/// Runs an application built by `app::setup(…)`.
Expand All @@ -74,34 +120,33 @@ async fn get_status() -> impl IntoResponse {
StatusCode::OK
}

fn distinct_until_changed<S: Stream>(stream: S) -> impl Stream<Item = S::Item>
where
S::Item: Clone + PartialEq,
{
let mut last = None;
stream.filter(move |item| {
let changed = last.as_ref() != Some(item);
last = Some(item.clone());
changed
})
}

async fn get_status_stream(
State((config, _, smartcard)): State<AppState>,
State(AppState { broadcast_tx, .. }): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
Sse::new(try_stream! {
let mut last_card_details = None;

loop {
let new_card_details = smartcard.get_card_details();

if new_card_details != last_card_details {
last_card_details = new_card_details.clone();
yield Event::default().json_data(SessionData {
auth_status: match new_card_details {
Some(CardDetailsWithAuthInfo { card_details, .. }) if card_details.jurisdiction_code() == config.jurisdiction_code => AuthStatus::Authenticated,
Some(_) => AuthStatus::UnauthenticatedInvalidCard,
None => AuthStatus::UnauthenticatedNoCard,
},
jurisdiction_code: Some(config.jurisdiction_code.clone()),
}).unwrap();
}
let broadcast_rx = broadcast_tx.subscribe();

sleep(Duration::from_millis(100)).await;
}
})
.keep_alive(KeepAlive::default())
let stream = distinct_until_changed(
tokio_stream::wrappers::BroadcastStream::new(broadcast_rx).filter_map(Result::ok),
)
.map(|data| Event::default().json_data(data).unwrap())
.map(Ok);

Sse::new(stream).keep_alive(KeepAlive::default())
}

async fn get_elections(State((_, pool, _)): State<AppState>) -> impl IntoResponse {
async fn get_elections(State(AppState { pool, .. }): State<AppState>) -> impl IntoResponse {
let mut connection = match pool.acquire().await {
Ok(connection) => connection,
Err(e) => {
Expand All @@ -128,7 +173,9 @@ async fn get_elections(State((_, pool, _)): State<AppState>) -> impl IntoRespons
}

async fn create_election(
State((_, pool, smartcard)): State<AppState>,
State(AppState {
pool, smartcard, ..
}): State<AppState>,
body: Bytes,
) -> impl IntoResponse {
let jurisdiction_code = match smartcard.get_card_details() {
Expand Down
3 changes: 2 additions & 1 deletion apps/cacvote-jx-terminal/backend/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub(crate) async fn get_elections(
signature
FROM objects
WHERE object_type = 'Election'
ORDER BY created_at DESC
"#,
)
.fetch_all(connection)
Expand All @@ -58,7 +59,7 @@ pub(crate) async fn get_elections(
for object in objects {
let payload = match object.try_to_inner() {
Ok(payload) => {
tracing::debug!("got object payload: {payload:?}");
tracing::trace!("got object payload: {payload:?}");
payload
}
Err(err) => {
Expand Down
9 changes: 9 additions & 0 deletions apps/cacvote-jx-terminal/backend/src/smartcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl SmartcardInner {
pub(crate) fn refresh_card_details(&mut self) {
let readers = self.readers_with_cards.lock().unwrap();
let Some(reader_name) = readers.first() else {
self.last_selected_card_reader_info = None;
return;
};

Expand All @@ -91,6 +92,11 @@ impl SmartcardInner {
}
}

// we shouldn't be connecting to the same card, but just in case we drop
// the current card to kill the connection and avoid the new connection
// failing
drop(self.card.take());

let reader = CardReader::new(self.ctx.clone(), reader_name.clone());
match reader.get_card() {
Ok(card) => {
Expand All @@ -106,12 +112,15 @@ impl SmartcardInner {
Ok(card_details_with_auth_info) => {
self.last_selected_card_reader_info =
Some((reader_name.to_string(), card_details_with_auth_info));
return;
}
Err(e) => {
tracing::error!("error reading card details: {e}");
}
}
}

self.last_selected_card_reader_info = None;
}
}

Expand Down
16 changes: 10 additions & 6 deletions apps/cacvote-jx-terminal/frontend/src/layouts/app_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub fn AppLayout(cx: Scope) -> Element {
use_coroutine(cx, {
to_owned![nav, session_data];
|_rx: UnboundedReceiver<i32>| async move {
log::info!("starting status stream");
let eventsource = web_sys::EventSource::new("/api/status-stream").unwrap();

let callback = Closure::wrap(Box::new(move |event: MessageEvent| {
Expand All @@ -25,12 +26,15 @@ pub fn AppLayout(cx: Scope) -> Element {
Ok(new_session_data) => {
log::info!("updating session data: {:?}", new_session_data);

if new_session_data.jurisdiction_code.is_none() {
log::info!("redirecting to machine locked page");
nav.push(Route::MachineLockedPage);
} else {
log::info!("redirecting to elections page");
nav.push(Route::ElectionsPage);
match new_session_data {
SessionData::Authenticated { .. } => {
log::info!("redirecting to elections page");
nav.push(Route::ElectionsPage);
}
SessionData::Unauthenticated { .. } => {
log::info!("redirecting to machine locked page");
nav.push(Route::MachineLockedPage);
}
}

*session_data.write() = new_session_data;
Expand Down
Loading

0 comments on commit f9f6dbc

Please sign in to comment.