From 450e496f2ca0f593ff190770910dd1006feb9c59 Mon Sep 17 00:00:00 2001 From: Ifeanyi Ubah Date: Mon, 22 Mar 2021 20:56:24 +0100 Subject: [PATCH] Move setup logic to lib to enable reuse (#215) * Move setup logic to lib to enable reuse * Update src/runner.rs Co-authored-by: Mark Mandel --- src/extensions/filter_registry.rs | 15 ++-- src/lib.rs | 1 + src/main.rs | 73 +------------------ src/runner.rs | 116 ++++++++++++++++++++++++++++++ 4 files changed, 130 insertions(+), 75 deletions(-) create mode 100644 src/runner.rs diff --git a/src/extensions/filter_registry.rs b/src/extensions/filter_registry.rs index 1e0a3a4d17..68b13ecf57 100644 --- a/src/extensions/filter_registry.rs +++ b/src/extensions/filter_registry.rs @@ -348,12 +348,19 @@ pub struct FilterRegistry { } impl FilterRegistry { - /// insert registers a Filter under the provider's given name. - pub fn insert(&mut self, provider: P) + /// insert adds a [`FilterFactory`] to this filter registry. + pub fn insert(&mut self, factory: T) where - P: FilterFactory, + T: FilterFactory, { - self.registry.insert(provider.name(), Box::new(provider)); + self.registry.insert(factory.name(), Box::new(factory)); + } + + /// insert_all adds the provided [`FilterFactory`]s to this filter registry. + pub fn insert_all(&mut self, factories: Vec>) { + for factory in factories { + self.registry.insert(factory.name(), factory); + } } /// get returns an instance of a filter for a given Key. Returns Error if not found, diff --git a/src/lib.rs b/src/lib.rs index 8a90a44b70..aa26a8e8a0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,6 +24,7 @@ pub mod config; pub mod extensions; pub mod metrics; pub mod proxy; +pub mod runner; pub mod test_utils; pub(crate) mod utils; pub(crate) mod xds; diff --git a/src/main.rs b/src/main.rs index 79586bd3e7..963c5133b6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,78 +14,9 @@ * limitations under the License. */ -use std::fs::File; -use std::process; -use std::sync::Arc; - -use clap::App; -use slog::{info, o}; - -use prometheus::Registry; -use quilkin::config::Config; -use quilkin::proxy::{logger, Builder, Metrics}; -use tokio::signal; -use tokio::sync::watch; - -const VERSION: &str = env!("CARGO_PKG_VERSION"); - -#[cfg(debug_assertions)] -fn version() -> String { - format!("{}+debug", VERSION) -} - -#[cfg(not(debug_assertions))] -fn version() -> String { - VERSION.into() -} +use quilkin::runner::run; #[tokio::main] async fn main() { - let version = version(); - let base_logger = logger(); - let log = base_logger.new(o!("source" => "main")); - - let matches = App::new("Quilkin Proxy") - .version(version.as_str()) - .about("Quilkin is a non-transparent UDP proxy specifically designed for use with large scale multiplayer dedicated game servers") - .arg(clap::Arg::with_name("filename") - .short("f") - .long("filename") - .value_name("FILE") - .help("The yaml configuration file") - .required(true) - .takes_value(true)) - .get_matches(); - - let filename = matches.value_of("filename").unwrap(); - info!(log, "Starting Quilkin"; "version" => version); - - let config = Arc::new(Config::from_reader(File::open(filename).unwrap()).unwrap()); - let server = Builder::from(config) - .with_log(base_logger) - .with_metrics(Metrics::new( - Some("[::]:9091".parse().unwrap()), - Registry::default(), - )) - .validate() - .unwrap() - .build(); - - let (shutdown_tx, shutdown_rx) = watch::channel::<()>(()); - tokio::spawn(async move { - // Don't unwrap in order to ensure that we execute - // any subsequent shutdown tasks. - signal::ctrl_c().await.ok(); - shutdown_tx.send(()).ok(); - }); - - match server.run(shutdown_rx).await { - Ok(()) => { - info!(log, "Shutting down"); - } - Err(err) => { - info!(log, "Shutting down with error: {}", err); - process::exit(1); - } - } + run(vec![]).await.unwrap() } diff --git a/src/runner.rs b/src/runner.rs new file mode 100644 index 0000000000..bc5bf997c4 --- /dev/null +++ b/src/runner.rs @@ -0,0 +1,116 @@ +/* + * Copyright 2021 Google LLC All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::fs::File; +use std::sync::Arc; + +use clap::App; +use prometheus::Registry; +use slog::{info, o, Logger}; +use tokio::signal; +use tokio::sync::watch; + +use crate::config::Config; +use crate::extensions::{default_registry, FilterFactory, FilterRegistry}; +use crate::proxy::{logger, Builder, Metrics}; + +const VERSION: &str = env!("CARGO_PKG_VERSION"); + +#[cfg(debug_assertions)] +fn version() -> String { + format!("{}+debug", VERSION) +} + +#[cfg(not(debug_assertions))] +fn version() -> String { + VERSION.into() +} + +/// Wraps an error message returned by [`run`]. +#[derive(Debug)] +pub struct Error(String); + +/// Start and run a proxy. Any passed in [`FilterFactory`] are included +/// alongside the default filter factories.. +pub async fn run(filter_factories: Vec>) -> Result<(), Error> { + let version = version(); + let base_logger = logger(); + let log = base_logger.new(o!("source" => "run")); + + let matches = App::new("Quilkin Proxy") + .version(version.as_str()) + .about("Quilkin is a non-transparent UDP proxy specifically designed for use with large scale multiplayer dedicated game servers") + .arg(clap::Arg::with_name("filename") + .short("f") + .long("filename") + .value_name("FILE") + .help("The yaml configuration file") + .required(true) + .takes_value(true)) + .get_matches(); + + let filename = matches + .value_of("filename") + .ok_or_else(|| Error("missing argument `filename`".into()))?; + info!(log, "Starting Quilkin"; "version" => version); + + let config = Arc::new( + Config::from_reader(File::open(filename).map_err(|err| Error(format!("{}", err)))?) + .map_err(|err| Error(format!("{}", err)))?, + ); + let server = Builder::from(config) + .with_log(base_logger) + .with_metrics(Metrics::new( + Some( + "[::]:9091" + .parse() + .map_err(|err| Error(format!("failed to create metrics address: {}", err)))?, + ), + Registry::default(), + )) + .with_filter_registry(create_filter_registry(&log, filter_factories)) + .validate() + .map_err(|err| Error(format!("{:?}", err)))? + .build(); + + let (shutdown_tx, shutdown_rx) = watch::channel::<()>(()); + tokio::spawn(async move { + // Don't unwrap in order to ensure that we execute + // any subsequent shutdown tasks. + signal::ctrl_c().await.ok(); + shutdown_tx.send(()).ok(); + }); + + match server.run(shutdown_rx).await { + Ok(()) => { + info!(log, "Shutting down"); + Ok(()) + } + Err(err) => { + info!(log, "Shutting down with error: {}", err); + Err(Error(format!("{:?}", err))) + } + } +} + +fn create_filter_registry( + log: &Logger, + additional_filter_factories: Vec>, +) -> FilterRegistry { + let mut registry = default_registry(log); + registry.insert_all(additional_filter_factories); + registry +}