Skip to content

Commit

Permalink
ref(actix): Migrate server actor to the "service" arch (#1723)
Browse files Browse the repository at this point in the history
This change introduces changes to migrate `Server` actor to the new
`Service` architecture:
* remove `actix` dependencies from the `Server` actor
* implement `Service` for the `Server` actor
* introduce the `HttpServer` helper struct, which takes care of starting
the shutting down the `actix_web` http service, and removes exposed
`Recipient` from the `Server` actor - now everything is hidden and in
the future can be removed from the one place.

The former `Server` actor and current `ServerService` subscribes only to
`Shutdown` watch channel and makes sure to trigger the shutdown of the
http server with all its workers.
  • Loading branch information
olksdr committed Jan 11, 2023
1 parent 821729d commit 60d054a
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 75 deletions.
7 changes: 5 additions & 2 deletions relay-server/src/actors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! Actors require an actix system to run, see [`relay_system`] and particularly
//! [`Controller`](relay_system::Controller) for more information.
//!
//! The web server is wrapped by the [`Server`](server::Server) actor. It starts the actix http web
//! The web server is wrapped by the [`ServerService`](server::ServerService) actor. It starts the actix http web
//! server and relays the graceful shutdown signal. Internally, it creates several other actors
//! comprising the service state:
//!
Expand All @@ -25,7 +25,10 @@
//! use relay_server::controller::Controller;
//! use relay_server::server::Server;
//!
//! Controller::run(|| Server::start())
//! let rt = tokio::runtime::Runtime::new().unwrap();
//! let sys = actix::System::new("my-system");
//!
//! Controller::run(rt.handle(), sys, || Server::start())
//! .expect("failed to start relay");
//! ```
pub mod envelopes;
Expand Down
60 changes: 24 additions & 36 deletions relay-server/src/actors/server.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,37 @@
use ::actix::prelude::*;
use actix_web::server::StopServer;
use futures01::prelude::*;

use relay_config::Config;
use relay_statsd::metric;
use relay_system::{Controller, Shutdown};
use relay_system::{Controller, Service, Shutdown};

use crate::service;
use crate::service::HttpServer;
use crate::statsd::RelayCounters;

pub struct Server {
http_server: Recipient<StopServer>,
pub struct ServerService {
http_server: HttpServer,
}

impl Server {
pub fn start(config: Config) -> anyhow::Result<Addr<Self>> {
impl ServerService {
pub fn start(config: Config) -> anyhow::Result<()> {
metric!(counter(RelayCounters::ServerStarting) += 1);
let http_server = service::start(config)?;
Ok(Server { http_server }.start())
}
}

impl Actor for Server {
type Context = Context<Self>;

fn started(&mut self, context: &mut Self::Context) {
Controller::subscribe(context.address());
let http_server = HttpServer::start(config)?;
Self { http_server }.start();
Ok(())
}
}

impl Handler<Shutdown> for Server {
type Result = ResponseFuture<(), ()>;

fn handle(&mut self, message: Shutdown, _context: &mut Self::Context) -> Self::Result {
let graceful = message.timeout.is_some();

// We assume graceful shutdown if we're given a timeout. The actix-web http server is
// configured with the same timeout, so it will match. Unfortunately, we have to drop any
// errors and replace them with the generic `TimeoutError`.
let future = self
.http_server
.send(StopServer { graceful })
.map_err(|_| ())
.and_then(|result| result.map_err(|_| ()));

Box::new(future)
impl Service for ServerService {
type Interface = ();

fn spawn_handler(self, _rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
let mut shutdown = Controller::shutdown_handle();
loop {
tokio::select! {
Shutdown { timeout } = shutdown.notified() => {
self.http_server.shutdown(timeout.is_some());
},
else => break,
}
}
});
}
}
22 changes: 20 additions & 2 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ mod utils;
use relay_config::Config;
use relay_system::Controller;

use crate::actors::server::Server;
use crate::actors::server::ServerService;

/// Runs a relay web server and spawns all internal worker threads.
///
Expand All @@ -280,5 +280,23 @@ pub fn run(config: Config) -> anyhow::Result<()> {
// Run the controller and block until a shutdown signal is sent to this process. This will
// create an actix system, start a web server and run all relevant actors inside. See the
// `actors` module documentation for more information on all actors.
Controller::run(|| Server::start(config))

// Create the old tokio 0.x runtime. It's required for old actix System to exist by `create_runtime` utiliy.
//
// TODO(actix): this can be removed once all the actors are on the new tokio. It looks like
// that this mostly needed for Upstream actor right now. ANd
let sys = actix::System::new("relay");
// We also need new tokio 1.x runtime. This cannot be created in the [`relay_system::Controller`] since
// it cannot access the `create_runtime` utilily function from there. This can be changed once
// the [`actix::System`] is removed.
let runtime = utils::create_runtime("http-server-handler", 1);
let shutdown_timeout = config.shutdown_timeout();

Controller::run(runtime.handle(), sys, || ServerService::start(config))?;

// Properly shutdown the new tokio runtime.
runtime.shutdown_timeout(shutdown_timeout);
relay_log::info!("relay shutdown complete");

Ok(())
}
75 changes: 49 additions & 26 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use std::fmt;
use std::sync::Arc;

use actix::prelude::*;
use actix_web::server::StopServer;
use actix_web::{server, App};
use anyhow::{Context, Result};
use futures01::Future;
use listenfd::ListenFd;
use once_cell::race::OnceBox;

Expand Down Expand Up @@ -320,30 +322,51 @@ where
}
}

/// Given a relay config spawns the server together with all actors and lets them run forever.
///
/// Effectively this boots the server.
pub fn start(config: Config) -> Result<Recipient<server::StopServer>> {
let config = Arc::new(config);

Controller::from_registry().do_send(Configure {
shutdown_timeout: config.shutdown_timeout(),
});

let state = ServiceState::start(config.clone())?;
let mut server = server::new(move || make_app(state.clone()));
server = server
.workers(config.cpu_concurrency())
.shutdown_timeout(config.shutdown_timeout().as_secs() as u16)
.keep_alive(config.keepalive_timeout().as_secs() as usize)
.maxconn(config.max_connections())
.maxconnrate(config.max_connection_rate())
.backlog(config.max_pending_connections())
.disable_signals();

server = listen(server, &config)?;
server = listen_ssl(server, &config)?;

dump_listen_infos(&server);
Ok(server.start().recipient())
/// Keeps the address to the running http servers and helps with start/stop handling.
pub struct HttpServer(Recipient<StopServer>);

impl HttpServer {
/// Given a relay config spawns the server together with all actors and lets them run forever.
///
/// Effectively this boots the server.
pub fn start(config: Config) -> Result<Self> {
let config = Arc::new(config);

Controller::from_registry().do_send(Configure {
shutdown_timeout: config.shutdown_timeout(),
});

let state = ServiceState::start(config.clone())?;
let mut server = server::new(move || make_app(state.clone()));
server = server
.workers(config.cpu_concurrency())
.shutdown_timeout(config.shutdown_timeout().as_secs() as u16)
.keep_alive(config.keepalive_timeout().as_secs() as usize)
.maxconn(config.max_connections())
.maxconnrate(config.max_connection_rate())
.backlog(config.max_pending_connections())
.disable_signals();

server = listen(server, &config)?;
server = listen_ssl(server, &config)?;

dump_listen_infos(&server);
let recipient = server.start().recipient();
Ok(Self(recipient))
}

/// Triggers the shutdown process by sending [`actix_web::server::StopServer`] to the running http server.
pub fn shutdown(&self, graceful: bool) {
let Self(recipient) = self;
relay_log::info!("Shutting down HTTP server");
recipient.send(StopServer { graceful }).wait().ok();
}
}

impl fmt::Debug for HttpServer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("HttpServer")
.field(&"actix::Recipient<actix_web::server::StopServer>")
.finish()
}
}
25 changes: 16 additions & 9 deletions relay-system/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;
use actix::actors::signal;
use actix::fut;
use actix::prelude::*;
use actix::SystemRunner;
use futures01::future;
use futures01::prelude::*;
use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -82,7 +83,7 @@ impl ShutdownHandle {
/// }
///
///
/// Controller::run(|| -> Result<(), ()> {
/// Controller::run(tokio::runtime::Runtime::new().unwrap().handle(), System::new("my-actix-system"), || -> Result<(), ()> {
/// MyActor.start();
/// # System::current().stop();
/// Ok(())
Expand All @@ -101,22 +102,29 @@ impl Controller {
SystemService::from_registry()
}

/// Starts an actix system and runs the `factory` to start actors.
/// Runs the `factory` to start actors.
///
/// The function accepts the old tokio 0.x [`actix::SystemRunner`] and the reference to
/// [`tokio::runtime::Handle`] from the new tokio 1.x runtime, which we enter before the
/// factory is run, to make sure that two systems, old and new one, are available.
///
/// The factory may be used to start actors in the actix system before it runs. If the factory
/// returns an error, the actix system is not started and instead an error returned. Otherwise,
/// the system blocks the current thread until a shutdown signal is sent to the server and all
/// actors have completed a graceful shutdown.
pub fn run<F, R, E>(factory: F) -> Result<(), E>
pub fn run<F, R, E>(
handle: &tokio::runtime::Handle,
sys: SystemRunner,
factory: F,
) -> Result<(), E>
where
F: FnOnce() -> Result<R, E>,
F: FnOnce() -> Result<R, E> + 'static,
F: Sync + Send,
{
let sys = System::new("relay");

compat::init();

// Run the factory and exit early if an error happens. The return value of the factory is
// discarded for convenience, to allow shorthand notations.
// While starting http server ensure that the new tokio 1.x system is available.
let _guard = handle.enter();
factory()?;

// Ensure that the controller starts if no actor has started it yet. It will register with
Expand All @@ -128,7 +136,6 @@ impl Controller {
// until a signal arrives or `Controller::stop` is called.
relay_log::info!("relay server starting");
sys.run();
relay_log::info!("relay shutdown complete");

Ok(())
}
Expand Down

0 comments on commit 60d054a

Please sign in to comment.