Skip to content

Commit

Permalink
explicit state synchronisation
Browse files Browse the repository at this point in the history
this prevents a race condition between requesting activation
and configuration changes in the gtk frontend
  • Loading branch information
feschber committed May 11, 2024
1 parent 9969f99 commit 31fe26a
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 133 deletions.
12 changes: 6 additions & 6 deletions src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,18 @@ pub enum FrontendRequest {
UpdatePosition(ClientHandle, Position),
/// update fix-ips
UpdateFixIps(ClientHandle, Vec<IpAddr>),
/// request the state of the given client
GetState(ClientHandle),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FrontendEvent {
/// a client was created
Created(ClientHandle, ClientConfig, ClientState),
/// a client was updated
Updated(ClientHandle, ClientConfig),
/// no such client
NoSuchClient(ClientHandle),
/// state changed
StateChange(ClientHandle, ClientState),
State(ClientHandle, ClientConfig, ClientState),
/// the client was deleted
Deleted(ClientHandle),
/// new port, reason of failure (if failed)
Expand Down Expand Up @@ -235,9 +237,7 @@ impl FrontendListener {
let json = serde_json::to_string(&notify).unwrap();
let payload = json.as_bytes();
let len = payload.len().to_be_bytes();
log::debug!("json: {json}, len: {}", payload.len());

log::debug!("broadcasting event to streams: {:?}", self.tx_streams);
log::debug!("broadcasting event to streams: {json}");
let mut keep = vec![];
// TODO do simultaneously
for tx in self.tx_streams.iter_mut() {
Expand Down
66 changes: 22 additions & 44 deletions src/frontend/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ impl<'a> Cli<'a> {
}
}

async fn update_client(&mut self, handle: ClientHandle) -> Result<()> {
self.send_request(FrontendRequest::GetState(handle)).await?;
loop {
let event = self.await_event().await?;
self.handle_event(event.clone());
if let FrontendEvent::State(_, _, _) | FrontendEvent::NoSuchClient(_) = event {
break;
}
}
Ok(())
}

async fn execute(&mut self, cmd: Command) -> Result<()> {
match cmd {
Command::None => {}
Expand All @@ -125,14 +137,8 @@ impl<'a> Cli<'a> {
FrontendRequest::UpdatePosition(handle, pos),
] {
self.send_request(request).await?;
loop {
let event = self.await_event().await?;
self.handle_event(event.clone());
if let FrontendEvent::Updated(_, _) = event {
break;
}
}
}
self.update_client(handle).await?;
}
Command::Disconnect(id) => {
self.send_request(FrontendRequest::Delete(id)).await?;
Expand All @@ -148,26 +154,12 @@ impl<'a> Cli<'a> {
Command::Activate(id) => {
self.send_request(FrontendRequest::Activate(id, true))
.await?;
loop {
let event = self.await_event().await?;
self.handle_event(event.clone());
if let FrontendEvent::StateChange(_, _) = event {
self.handle_event(event);
break;
}
}
self.update_client(id).await?;
}
Command::Deactivate(id) => {
self.send_request(FrontendRequest::Activate(id, false))
.await?;
loop {
let event = self.await_event().await?;
self.handle_event(event.clone());
if let FrontendEvent::StateChange(_, _) = event {
self.handle_event(event);
break;
}
}
self.update_client(id).await?;
}
Command::List => {
self.send_request(FrontendRequest::Enumerate()).await?;
Expand All @@ -182,25 +174,12 @@ impl<'a> Cli<'a> {
Command::SetHost(handle, host) => {
let request = FrontendRequest::UpdateHostname(handle, Some(host.clone()));
self.send_request(request).await?;
loop {
let event = self.await_event().await?;
self.handle_event(event.clone());
if let FrontendEvent::Updated(_, _) = event {
self.handle_event(event);
break;
}
}
self.update_client(handle).await?;
}
Command::SetPort(handle, port) => {
let request = FrontendRequest::UpdatePort(handle, port.unwrap_or(DEFAULT_PORT));
self.send_request(request).await?;
loop {
let event = self.await_event().await?;
self.handle_event(event.clone());
if let FrontendEvent::Updated(_, _) = event {
break;
}
}
self.update_client(handle).await?;
}
Command::Help => {
for cmd_type in [
Expand Down Expand Up @@ -244,8 +223,11 @@ impl<'a> Cli<'a> {
eprintln!();
self.clients.push((h, c, s));
}
FrontendEvent::Updated(h, c) => {
if let Some((_, config, _)) = self.find_mut(h) {
FrontendEvent::NoSuchClient(h) => {
eprintln!("no such client: {h}");
}
FrontendEvent::State(h, c, s) => {
if let Some((_, config, state)) = self.find_mut(h) {
let old_host = config.hostname.clone().unwrap_or("\"\"".into());
let new_host = c.hostname.clone().unwrap_or("\"\"".into());
if old_host != new_host {
Expand All @@ -261,10 +243,6 @@ impl<'a> Cli<'a> {
eprintln!("client {h} ips updated: {:?}", c.fix_ips)
}
*config = c;
}
}
FrontendEvent::StateChange(h, s) => {
if let Some((_, _, state)) = self.find_mut(h) {
if state.active ^ s.active {
eprintln!(
"client {h} {}",
Expand Down
7 changes: 3 additions & 4 deletions src/frontend/gtk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,11 @@ fn build_ui(app: &Application) {
FrontendEvent::Deleted(client) => {
window.delete_client(client);
}
FrontendEvent::Updated(handle, client) => {
window.update_client_config(handle, client);
}
FrontendEvent::StateChange(handle, state) => {
FrontendEvent::State(handle, config, state) => {
window.update_client_config(handle, config);
window.update_client_state(handle, state);
}
FrontendEvent::NoSuchClient(_) => { }
FrontendEvent::Error(e) => {
window.show_toast(e.as_str());
},
Expand Down
80 changes: 38 additions & 42 deletions src/frontend/gtk/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ impl Window {
.expect("Could not get clients")
}

fn client_by_idx(&self, idx: u32) -> Option<ClientObject> {
self.clients().item(idx).map(|o| o.downcast().unwrap())
}

fn setup_clients(&self) {
let model = gio::ListStore::new::<ClientObject>();
self.imp().clients.replace(Some(model));
Expand All @@ -62,27 +66,24 @@ impl Window {
let client_object = obj.downcast_ref().expect("Expected object of type `ClientObject`.");
let row = window.create_client_row(client_object);
row.connect_closure("request-update", false, closure_local!(@strong window => move |row: ClientRow, active: bool| {
let index = row.index() as u32;
let Some(client) = window.clients().item(index) else {
return;
};
let client = client.downcast_ref::<ClientObject>().unwrap();
window.request_client_activate(client, active);
window.request_client_update(client);
if let Some(client) = window.client_by_idx(row.index() as u32) {
window.request_client_activate(&client, active);
window.request_client_update(&client);
window.request_client_state(&client);
}
}));
row.connect_closure("request-delete", false, closure_local!(@strong window => move |row: ClientRow| {
let index = row.index() as u32;
window.request_client_delete(index);
if let Some(client) = window.client_by_idx(row.index() as u32) {
window.request_client_delete(&client);
}
}));
row.connect_closure("request-dns", false, closure_local!(@strong window => move
|row: ClientRow| {
let index = row.index() as u32;
let Some(client) = window.clients().item(index) else {
return;
};
let client = client.downcast_ref::<ClientObject>().unwrap();
window.request_client_update(client);
window.request_dns(index);
if let Some(client) = window.client_by_idx(row.index() as u32) {
window.request_client_update(&client);
window.request_dns(&client);
window.request_client_state(&client);
}
}));
row.upcast()
})
Expand Down Expand Up @@ -204,27 +205,30 @@ impl Window {
}
}

pub fn request_dns(&self, idx: u32) {
let client_object = self.clients().item(idx).unwrap();
let client_object: &ClientObject = client_object.downcast_ref().unwrap();
let data = client_object.get_data();
let event = FrontendRequest::ResolveDns(data.handle);
pub fn request_port_change(&self) {
let port = self.imp().port_entry.get().text().to_string();
if let Ok(port) = port.as_str().parse::<u16>() {
self.request(FrontendRequest::ChangePort(port));
} else {
self.request(FrontendRequest::ChangePort(DEFAULT_PORT));
}
}

pub fn request_client_state(&self, client: &ClientObject) {
let handle = client.handle();
let event = FrontendRequest::GetState(handle);
self.request(event);
}

pub fn request_client_create(&self) {
let event = FrontendRequest::Create;
self.imp().set_port(DEFAULT_PORT);
self.request(event);
}

pub fn request_port_change(&self) {
let port = self.imp().port_entry.get().text().to_string();
if let Ok(port) = port.as_str().parse::<u16>() {
self.request(FrontendRequest::ChangePort(port));
} else {
self.request(FrontendRequest::ChangePort(DEFAULT_PORT));
}
pub fn request_dns(&self, client: &ClientObject) {
let data = client.get_data();
let event = FrontendRequest::ResolveDns(data.handle);
self.request(event);
}

pub fn request_client_update(&self, client: &ClientObject) {
Expand All @@ -239,33 +243,25 @@ impl Window {
FrontendRequest::UpdatePosition(handle, position),
FrontendRequest::UpdatePort(handle, port),
] {
log::debug!("requesting: {event:?}");
self.request(event);
}
}

pub fn request_client_activate(&self, client: &ClientObject, active: bool) {
let handle = client.handle();

let event = FrontendRequest::Activate(handle, active);
log::debug!("requesting: {event:?}");
self.request(event);
}

pub fn request_client_delete(&self, idx: u32) {
if let Some(obj) = self.clients().item(idx) {
let client_object: &ClientObject = obj
.downcast_ref()
.expect("Expected object of type `ClientObject`.");
let handle = client_object.handle();
let event = FrontendRequest::Delete(handle);
self.request(event);
}
pub fn request_client_delete(&self, client: &ClientObject) {
let handle = client.handle();
let event = FrontendRequest::Delete(handle);
self.request(event);
}

pub fn request(&self, event: FrontendRequest) {
let json = serde_json::to_string(&event).unwrap();
log::debug!("requesting {json}");
log::debug!("requesting: {json}");
let mut stream = self.imp().stream.borrow_mut();
let stream = stream.as_mut().unwrap();
let bytes = json.as_bytes();
Expand Down
Loading

0 comments on commit 31fe26a

Please sign in to comment.