Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add new D-Bus Updates interface #521

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ env_logger = "0.8"
envsubst = "0.2"
fail = "0.4"
failure = "0.1"
failure_derive = "0.1.8"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do #519 instead of going this way.

filetime = "0.2"
futures = "0.3"
glob = "0.3"
Expand Down
16 changes: 14 additions & 2 deletions src/dbus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

mod experimental;
use experimental::Experimental;
mod updates;
use updates::Updates;

use crate::update_agent::UpdateAgent;
use actix::prelude::*;
Expand All @@ -12,6 +14,9 @@ use log::trace;
use zbus::fdo;
use zvariant::ObjectPath;

const ZINCATI_BUS_NAME: &str = "org.coreos.zincati";
const ZINCATI_OBJECT_PATH: &str = "/org/coreos/zincati";

pub struct DBusService {
agent_addr: Addr<UpdateAgent>,
}
Expand All @@ -31,7 +36,7 @@ impl DBusService {
let connection = zbus::Connection::new_system()?;

fdo::DBusProxy::new(&connection)?.request_name(
"org.coreos.zincati",
ZINCATI_BUS_NAME,
fdo::RequestNameFlags::ReplaceExisting.into(),
)?;

Expand All @@ -40,9 +45,16 @@ impl DBusService {
agent_addr: self.agent_addr.clone(),
};
object_server.at(
&ObjectPath::try_from("/org/coreos/zincati")?,
&ObjectPath::try_from(ZINCATI_OBJECT_PATH)?,
experimental_interface,
)?;
let updates_interface = Updates {
agent_addr: self.agent_addr.clone(),
};
object_server.at(
&ObjectPath::try_from(ZINCATI_OBJECT_PATH)?,
updates_interface,
)?;

loop {
if let Err(err) = object_server.try_handle_next() {
Expand Down
82 changes: 82 additions & 0 deletions src/dbus/updates.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//! Updates interface for ushering the update agent to various states.

use crate::update_agent::{RefreshTick, RefreshTickCommand, UpdateAgent, UpdateAgentState};
use actix::Addr;
use failure::Error;
use fdo::Error::Failed;
use futures::prelude::*;
use tokio::runtime::Runtime;
use zbus::{dbus_interface, fdo};

/// Updates interface for checking for and finalizing updates.
pub(crate) struct Updates {
pub(crate) agent_addr: Addr<UpdateAgent>,
}

impl Updates {
/// Send msg to the update agent actor and wait for the returned future to resolve.
fn send_msg_to_agent(
&self,
msg: RefreshTick,
) -> Result<Result<UpdateAgentState, Error>, fdo::Error> {
let refresh_time_fut = self.agent_addr.send(msg).map_err(|e| {
let err_msg = format!("failed to get last refresh time from agent actor: {}", e);
log::error!("LastRefreshTime D-Bus method call: {}", err_msg);
Failed(err_msg)
});

Runtime::new()
.map_err(|e| {
let err_msg = format!("failed to create runtime to execute future: {}", e);
log::error!("{}", err_msg);
Failed(err_msg)
})
.and_then(|mut runtime| runtime.block_on(refresh_time_fut))
}
}

#[dbus_interface(name = "org.coreos.zincati.Updates")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather keep this under the existing Experimental interface for now, and iterate on the design there.

impl Updates {
/// Check for update immediately.
fn check_update(&self) -> fdo::Result<Vec<String>> {
let msg = RefreshTick {
command: RefreshTickCommand::CheckUpdate,
};

self.send_msg_to_agent(msg).and_then(|res| match res {
Ok(state) => match state {
UpdateAgentState::NoNewUpdate => Ok(vec![]),
UpdateAgentState::UpdateAvailable((release, _)) => Ok(vec![release.version]),
_ => {
let err_msg = "update agent reached unexpected state after update check";
log::error!("CheckUpdate D-Bus method call: {}", err_msg);
Err(Failed(String::from(err_msg)))
}
},
Err(e) => Err(Failed(format!("{}", e))),
})
}

/// Finalize update immediately.
fn finalize_update(&self, force: bool) -> fdo::Result<Vec<String>> {
let msg = RefreshTick {
command: RefreshTickCommand::FinalizeUpdate { force },
};

self.send_msg_to_agent(msg).and_then(|res| match res {
Ok(state) => match state {
UpdateAgentState::UpdateStaged(_) => {
Err(Failed(String::from("update finalization attempt failed")))
}
UpdateAgentState::UpdateFinalized(release) => Ok(vec![release.version]),
_ => {
let err_msg =
"update agent reached unexpected state after finalization attempt";
log::error!("FinalizeUpdate D-Bus method call: {}", err_msg);
Err(Failed(String::from(err_msg)))
}
},
Err(e) => Err(Failed(format!("{}", e))),
})
}
}
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
extern crate fail;
#[macro_use]
extern crate prometheus;
#[macro_use]
extern crate failure_derive;

// Cincinnati client.
mod cincinnati;
Expand Down
113 changes: 96 additions & 17 deletions src/update_agent/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Actor for UpdateAgent {
}

// Kick-start the state machine.
Self::tick_now(ctx);
self.tick_now(ctx);
}
}

Expand All @@ -48,16 +48,64 @@ impl Handler<LastRefresh> for UpdateAgent {
}
}

pub(crate) struct RefreshTick {}
/// Error thrown when the command that attempted to initiate a refresh tick
/// is not permitted to do so in the current state of the update agent.
#[derive(Debug, Fail)]
struct TickPermissionError {}
Copy link
Contributor

@lucab lucab Apr 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Permission usually has a different semantic nuance, related to a lack of privileges. In this case this is mostly related to an unsatisfiable request or an unfulfilled precondition. I don't have a proper suggestion for this right now, but it would be better to rename this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InvalidStateError?


impl std::fmt::Display for TickPermissionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "command not permitted in current update agent state")
}
}

pub enum RefreshTickCommand {
/// Command to check for updates.
CheckUpdate,
/// Command to finalize an update.
FinalizeUpdate { force: bool },
/// Tick initiated by update agent itself.
SelfTick,
}

/// A message to trigger an update agent tick.
pub struct RefreshTick {
/// Command that initiated the tick.
pub(crate) command: RefreshTickCommand,
}

impl RefreshTick {
/// Return whether the command in the command field is allowed to initiate
/// a refresh tick.
fn check_state(&self, cur_state: &UpdateAgentState) -> bool {
match self.command {
RefreshTickCommand::CheckUpdate => {
matches!(cur_state, UpdateAgentState::NoNewUpdate)
}
RefreshTickCommand::FinalizeUpdate { force: _ } => {
matches!(cur_state, UpdateAgentState::UpdateAvailable { .. })
}
// SelfTicks are always allowed.
RefreshTickCommand::SelfTick => true,
}
}
}

impl Message for RefreshTick {
type Result = Result<(), Error>;
type Result = Result<UpdateAgentState, Error>;
}

impl Handler<RefreshTick> for UpdateAgent {
type Result = ResponseActFuture<Self, Result<(), Error>>;
type Result = ResponseActFuture<Self, Result<UpdateAgentState, Error>>;

/// Return the state of the update agent's state machine after msg is handled.
fn handle(&mut self, msg: RefreshTick, ctx: &mut Self::Context) -> Self::Result {
// Make sure that the command that sent the RefreshTick is permitted to be
// called in update agent's current state.
if !msg.check_state(&self.state) {
return Box::pin(actix::fut::err(Error::from(TickPermissionError {})));
}

fn handle(&mut self, _msg: RefreshTick, ctx: &mut Self::Context) -> Self::Result {
let tick_timestamp = chrono::Utc::now();
LAST_REFRESH.set(tick_timestamp.timestamp());

Expand All @@ -75,7 +123,7 @@ impl Handler<RefreshTick> for UpdateAgent {
}
UpdateAgentState::UpdateStaged((release, _)) => {
let update = release.clone();
self.tick_finalize_update(update)
self.tick_finalize_update(update, &msg)
}
UpdateAgentState::UpdateFinalized(release) => {
let update = release.clone();
Expand All @@ -91,31 +139,50 @@ impl Handler<RefreshTick> for UpdateAgent {
"scheduling next agent refresh in {} seconds",
pause.as_secs()
);
Self::tick_later(ctx, pause);
actor.tick_later(ctx, pause);
} else {
let update_timestamp = chrono::Utc::now();
actor.state_changed = update_timestamp;
Self::tick_now(ctx);
actor.tick_now(ctx);
}
actix::fut::ready(())
});

// Process state machine refresh ticks sequentially.
ctx.wait(update_machine);

Box::pin(actix::fut::ok(()))
Box::pin(actix::fut::ok(self.state.clone()))
}
}

impl UpdateAgent {
/// Cancel the scheduled refresh tick whose handle is stored in the update agent's
/// `tick_later_handle` field, if any.
fn cancel_scheduled_ticks(&mut self, ctx: &mut Context<Self>) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When transitioning "naturally" (no messages received from the D-Bus actor) from a "steady state" (e.g. NoNewUpdates or UpdateAvailable) to a new state, tick_later_handle is a handle to
a scheduled msg that has already been sent, so there is no need to cancel it. But it doesn't look like there is any harm in calling ctx.cancel_future() on a handle that is already sent, so for simplicity, always attempt to cancel.
But let me know if this doesn't make sense.

if let Some(handle) = self.tick_later_handle {
ctx.cancel_future(handle);
self.tick_later_handle = None;
}
}

/// Schedule an immediate refresh of the state machine.
pub fn tick_now(ctx: &mut Context<Self>) {
ctx.notify(RefreshTick {})
pub fn tick_now(&mut self, ctx: &mut Context<Self>) {
// Cancel scheduled ticks, if any.
self.cancel_scheduled_ticks(ctx);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here it is possible to have a race where a self tick msg and a command msg is sent at the same time, and the command msg arrives first.
If this happens, we can't really cancel the scheduled tick like this because the message had alrady been sent. Though I don't think it is too big of a deal to have an extra tick; in practice, this results in two continuous ticks without any delay in between when Zincati is in a steady state (NoNewUpdates and UpdateStaged) where it periodically sends itself refresh ticks.

ctx.notify(RefreshTick {
command: RefreshTickCommand::SelfTick,
})
}

/// Schedule a delayed refresh of the state machine.
pub fn tick_later(ctx: &mut Context<Self>, after: std::time::Duration) -> actix::SpawnHandle {
ctx.notify_later(RefreshTick {}, after)
pub fn tick_later(&mut self, ctx: &mut Context<Self>, after: std::time::Duration) {
let handle = ctx.notify_later(
RefreshTick {
command: RefreshTickCommand::SelfTick,
},
after,
);
self.tick_later_handle = Some(handle);
}

/// Pausing interval between state-machine refresh cycles.
Expand Down Expand Up @@ -322,12 +389,23 @@ impl UpdateAgent {
fn tick_finalize_update(
&mut self,
release: Release,
msg: &RefreshTick,
) -> ResponseActFuture<Self, Result<(), ()>> {
trace!("trying to finalize an update");

let strategy_can_finalize = self.strategy.can_finalize();
let state_change = actix::fut::wrap_future::<_, Self>(strategy_can_finalize)
.then(|strategy_can_finalize, actor, _ctx| {
let mut strategy_can_finalize = false;
let mut usersessions_can_finalize = false;
if let RefreshTickCommand::FinalizeUpdate { force } = msg.command {
strategy_can_finalize = force;
// If msg's associated command is FinalizeUpdate, ignore logged in sessions
// and allow finalizations even when active user sessions are present.
usersessions_can_finalize = true;
}

let strategy_can_finalize_fut = self.strategy.can_finalize();
let state_change = actix::fut::wrap_future::<_, Self>(strategy_can_finalize_fut)
.then(move |can_finalize, actor, _ctx| {
strategy_can_finalize = strategy_can_finalize || can_finalize;
if !strategy_can_finalize {
update_unit_status(&format!(
"update staged: {}; reboot pending due to update strategy",
Expand All @@ -338,7 +416,8 @@ impl UpdateAgent {
actor.state.update_staged(release);
Box::pin(actix::fut::err(()))
} else {
let usersessions_can_finalize = actor.state.usersessions_can_finalize();
usersessions_can_finalize =
usersessions_can_finalize || actor.state.usersessions_can_finalize();
if !usersessions_can_finalize {
update_unit_status(&format!(
"update staged: {}; reboot delayed due to active user sessions",
Expand Down
9 changes: 6 additions & 3 deletions src/update_agent/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! Update agent.

mod actor;
pub use actor::LastRefresh;
pub use actor::{LastRefresh, RefreshTick, RefreshTickCommand};

use crate::cincinnati::Cincinnati;
use crate::config::Settings;
use crate::identity::Identity;
use crate::rpm_ostree::{Release, RpmOstreeClient};
use crate::strategy::UpdateStrategy;
use actix::Addr;
use actix::{Addr, SpawnHandle};
use chrono::prelude::*;
use failure::{bail, Fallible, ResultExt};
use prometheus::{IntCounter, IntGauge};
Expand Down Expand Up @@ -89,7 +89,7 @@ where

/// State machine for the agent.
#[derive(Clone, Debug, PartialEq, Eq)]
enum UpdateAgentState {
pub enum UpdateAgentState {
/// Initial state upon actor start.
StartState,
/// Agent initialized.
Expand Down Expand Up @@ -349,6 +349,8 @@ pub(crate) struct UpdateAgent {
state: UpdateAgentState,
/// Timestamp of last state transition.
state_changed: DateTime<Utc>,
/// Handle to future created by `tick_later`.
tick_later_handle: Option<SpawnHandle>,
}

impl UpdateAgent {
Expand All @@ -368,6 +370,7 @@ impl UpdateAgent {
state: UpdateAgentState::default(),
strategy: cfg.strategy,
state_changed: chrono::Utc::now(),
tick_later_handle: None,
};

Ok(agent)
Expand Down