From 560f9d6a7b9990dde59b3d2ae68f96d6c097b2a4 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Wed, 29 Mar 2023 03:57:44 +0400 Subject: [PATCH] Added channels pooling for amqp hooks (#121) --- docs/hooks.md | 6 +- src/config.rs | 103 ++++++---- src/notifiers/amqp_notifier.rs | 186 +++++++++++++------ src/notifiers/models/notification_manager.rs | 37 +--- 4 files changed, 209 insertions(+), 123 deletions(-) diff --git a/docs/hooks.md b/docs/hooks.md index d89ad3d..4bc372a 100644 --- a/docs/hooks.md +++ b/docs/hooks.md @@ -894,7 +894,11 @@ Configuration parameters: * `--hooks-amqp-durable-exchange` - adds durability to created exchange; * `--hooks-amqp-durable-queues` - adds durability to created; * `--hooks-amqp-celery` - adds headers required by [Celery](https://docs.celeryq.dev/en/stable/index.html); -* `--hooks-amqp-routing-key` - routing key for all messages passed to exchange. +* `--hooks-amqp-routing-key` - routing key for all messages passed to exchange; +* `--hooks-amqp-connection-pool-size` - maximum number of opened connections to RabbitMQ; +* `--hooks-amqp-channel-pool-size` - maximum number of opened channels for each connection to RabbitMQ; +* `--hooks-amqp-idle-connection-timeout` - timeout for idle connection. If the connection isn't used, it's dropped; +* `--hooks-amqp-idle-channels-timeout` - timeout for idle channels. If the channel isn't used, it's dropped. If no hooks_amqp_routing_key specified, rustus will send all messages with different routing keys. Named like `{prefix}.{event type}`. Eg `rustus.pre-create` and so on. diff --git a/src/config.rs b/src/config.rs index 4481369..6e80696 100644 --- a/src/config.rs +++ b/src/config.rs @@ -147,43 +147,10 @@ pub struct InfoStoreOptions { )] pub info_db_dsn: Option, } - #[derive(Parser, Debug, Clone)] #[allow(clippy::struct_excessive_bools)] -pub struct NotificationsOptions { - /// Notifications format. - /// - /// This format will be used in all - /// messages about hooks. - #[arg(long, default_value = "default", env = "RUSTUS_HOOKS_FORMAT")] - pub hooks_format: Format, - - /// Enabled hooks for notifications. - #[arg( - long, - default_value = "pre-create,post-create,post-receive,pre-terminate,post-terminate,post-finish", - env = "RUSTUS_HOOKS", - use_value_delimiter = true - )] - pub hooks: Vec, - - /// Use this option if you use rustus - /// behind any proxy. Like Nginx or Traefik. - #[arg(long, env = "RUSTUS_BEHIND_PROXY")] - pub behind_proxy: bool, - - /// List of URLS to send webhooks to. - #[arg(long, env = "RUSTUS_HOOKS_HTTP_URLS", use_value_delimiter = true)] - pub hooks_http_urls: Vec, - - // List of headers to forward from client. - #[arg( - long, - env = "RUSTUS_HOOKS_HTTP_PROXY_HEADERS", - use_value_delimiter = true - )] - pub hooks_http_proxy_headers: Vec, +pub struct AMQPHooksOptions { /// Url for AMQP server. #[cfg(feature = "amqp_notifier")] #[arg(long, env = "RUSTUS_HOOKS_AMQP_URL")] @@ -239,6 +206,71 @@ pub struct NotificationsOptions { )] pub hooks_amqp_queues_prefix: String, + /// Maximum number of connections for RabbitMQ. + #[cfg(feature = "amqp_notifier")] + #[arg( + long, + env = "RUSTUS_HOOKS_AMQP_CONNECTION_POOL_SIZE", + default_value = "10" + )] + pub hooks_amqp_connection_pool_size: u32, + + /// Maximum number of opened channels for each connection. + #[cfg(feature = "amqp_notifier")] + #[arg( + long, + env = "RUSTUS_HOOKS_AMQP_CHANNEL_POOL_SIZE", + default_value = "10" + )] + pub hooks_amqp_channel_pool_size: u32, + + /// After this amount of time the connection will be dropped. + #[cfg(feature = "amqp_notifier")] + #[arg(long, env = "RUSTUS_HOOKS_AMQP_IDLE_CONNECTION_TIMEOUT")] + pub hooks_amqp_idle_connection_timeout: Option, + + /// After this amount of time in seconds, the channel will be closed. + #[cfg(feature = "amqp_notifier")] + #[arg(long, env = "RUSTUS_HOOKS_AMQP_IDLE_CHANNELS_TIMEOUT")] + pub hooks_amqp_idle_channels_timeout: Option, +} + +#[derive(Parser, Debug, Clone)] +#[allow(clippy::struct_excessive_bools)] +pub struct NotificationsOptions { + /// Notifications format. + /// + /// This format will be used in all + /// messages about hooks. + #[arg(long, default_value = "default", env = "RUSTUS_HOOKS_FORMAT")] + pub hooks_format: Format, + + /// Enabled hooks for notifications. + #[arg( + long, + default_value = "pre-create,post-create,post-receive,pre-terminate,post-terminate,post-finish", + env = "RUSTUS_HOOKS", + use_value_delimiter = true + )] + pub hooks: Vec, + + /// Use this option if you use rustus + /// behind any proxy. Like Nginx or Traefik. + #[arg(long, env = "RUSTUS_BEHIND_PROXY")] + pub behind_proxy: bool, + + /// List of URLS to send webhooks to. + #[arg(long, env = "RUSTUS_HOOKS_HTTP_URLS", use_value_delimiter = true)] + pub hooks_http_urls: Vec, + + // List of headers to forward from client. + #[arg( + long, + env = "RUSTUS_HOOKS_HTTP_PROXY_HEADERS", + use_value_delimiter = true + )] + pub hooks_http_proxy_headers: Vec, + /// Directory for executable hook files. /// This parameter is used to call executables from dir. #[arg(long, env = "RUSTUS_HOOKS_DIR")] @@ -248,6 +280,9 @@ pub struct NotificationsOptions { /// notifying about upload status. #[arg(long, env = "RUSTUS_HOOKS_FILE")] pub hooks_file: Option, + + #[command(flatten)] + pub amqp_hook_opts: AMQPHooksOptions, } #[derive(Debug, Parser, Clone)] diff --git a/src/notifiers/amqp_notifier.rs b/src/notifiers/amqp_notifier.rs index 8a08aa3..4272b29 100644 --- a/src/notifiers/amqp_notifier.rs +++ b/src/notifiers/amqp_notifier.rs @@ -1,4 +1,5 @@ use crate::{ + config::AMQPHooksOptions, notifiers::{Hook, Notifier}, RustusResult, }; @@ -9,8 +10,9 @@ use bb8_lapin::LapinConnectionManager; use lapin::{ options::{BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions}, types::{AMQPValue, FieldTable, LongString}, - BasicProperties, ConnectionProperties, ExchangeKind, + BasicProperties, ChannelState, ConnectionProperties, ExchangeKind, }; +use std::time::Duration; use strum::IntoEnumIterator; #[allow(clippy::struct_excessive_bools)] @@ -25,7 +27,7 @@ pub struct DeclareOptions { #[derive(Clone)] pub struct AMQPNotifier { exchange_name: String, - pool: Pool, + channel_pool: Pool, queues_prefix: String, exchange_kind: String, routing_key: Option, @@ -33,28 +35,99 @@ pub struct AMQPNotifier { celery: bool, } +/// Channel manager for lapin channels. +/// +/// This manager is used with bb8 pool, +/// so it maintains opened channels for every connections. +/// +/// This pool uses connection pool +/// and issues new connections from it. +#[derive(Clone)] +pub struct ChannelPool { + pool: Pool, +} + +impl ChannelPool { + pub fn new(pool: Pool) -> Self { + ChannelPool { pool } + } +} + +/// ManagerConnection for ChannelPool. +/// +/// This manager helps you maintain opened channels. +#[async_trait::async_trait] +impl bb8::ManageConnection for ChannelPool { + type Connection = lapin::Channel; + type Error = lapin::Error; + + async fn connect(&self) -> Result { + Ok(self + .pool + .get() + .await + .map_err(|err| match err { + bb8::RunError::TimedOut => lapin::Error::ChannelsLimitReached, + bb8::RunError::User(user_err) => user_err, + })? + .create_channel() + .await?) + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + let valid_states = vec![ChannelState::Initial, ChannelState::Connected]; + if valid_states.contains(&conn.status().state()) { + Ok(()) + } else { + Err(lapin::Error::InvalidChannel(conn.id())) + } + } + + fn has_broken(&self, conn: &mut Self::Connection) -> bool { + let broken_states = vec![ChannelState::Closed, ChannelState::Error]; + broken_states.contains(&conn.status().state()) + } +} + impl AMQPNotifier { #[allow(clippy::fn_params_excessive_bools)] - pub async fn new( - amqp_url: &str, - exchange: &str, - queues_prefix: &str, - exchange_kind: &str, - routing_key: Option, - declare_options: DeclareOptions, - celery: bool, - ) -> RustusResult { - let manager = LapinConnectionManager::new(amqp_url, ConnectionProperties::default()); - let pool = bb8::Pool::builder().build(manager).await?; + pub async fn new(options: AMQPHooksOptions) -> RustusResult { + let manager = LapinConnectionManager::new( + options.hooks_amqp_url.unwrap().as_str(), + ConnectionProperties::default(), + ); + let connection_pool = bb8::Pool::builder() + .idle_timeout( + options + .hooks_amqp_idle_connection_timeout + .map(Duration::from_secs), + ) + .max_size(options.hooks_amqp_connection_pool_size) + .build(manager) + .await?; + let channel_pool = bb8::Pool::builder() + .idle_timeout( + options + .hooks_amqp_idle_channels_timeout + .map(Duration::from_secs), + ) + .max_size(options.hooks_amqp_channel_pool_size) + .build(ChannelPool::new(connection_pool)) + .await?; Ok(Self { - pool, - celery, - routing_key, - declare_options, - exchange_kind: exchange_kind.into(), - exchange_name: exchange.into(), - queues_prefix: queues_prefix.into(), + channel_pool, + celery: options.hooks_amqp_celery, + routing_key: options.hooks_amqp_routing_key, + declare_options: DeclareOptions { + declare_exchange: options.hooks_amqp_declare_exchange, + durable_exchange: options.hooks_amqp_durable_exchange, + declare_queues: options.hooks_amqp_declare_queues, + durable_queues: options.hooks_amqp_durable_queues, + }, + exchange_kind: options.hooks_amqp_exchange_kind, + exchange_name: options.hooks_amqp_exchange, + queues_prefix: options.hooks_amqp_queues_prefix, }) } @@ -74,7 +147,7 @@ impl AMQPNotifier { #[async_trait(?Send)] impl Notifier for AMQPNotifier { async fn prepare(&mut self) -> RustusResult<()> { - let chan = self.pool.get().await?.create_channel().await?; + let chan = self.channel_pool.get().await?; if self.declare_options.declare_exchange { chan.exchange_declare( self.exchange_name.as_str(), @@ -118,7 +191,7 @@ impl Notifier for AMQPNotifier { hook: Hook, _header_map: &HeaderMap, ) -> RustusResult<()> { - let chan = self.pool.get().await?.create_channel().await?; + let chan = self.channel_pool.get().await?; let queue = self.get_queue_name(hook); let routing_key = self.routing_key.as_ref().unwrap_or(&queue); let payload = if self.celery { @@ -162,20 +235,22 @@ mod tests { async fn get_notifier() -> AMQPNotifier { let amqp_url = std::env::var("TEST_AMQP_URL").unwrap(); - let mut notifier = AMQPNotifier::new( - amqp_url.as_str(), - uuid::Uuid::new_v4().to_string().as_str(), - uuid::Uuid::new_v4().to_string().as_str(), - "topic", - None, - DeclareOptions { - declare_exchange: true, - declare_queues: true, - durable_queues: false, - durable_exchange: false, - }, - true, - ) + let mut notifier = AMQPNotifier::new(crate::config::AMQPHooksOptions { + hooks_amqp_url: Some(amqp_url), + hooks_amqp_declare_exchange: true, + hooks_amqp_declare_queues: true, + hooks_amqp_durable_exchange: false, + hooks_amqp_durable_queues: false, + hooks_amqp_celery: true, + hooks_amqp_exchange: uuid::Uuid::new_v4().to_string(), + hooks_amqp_exchange_kind: String::from("topic"), + hooks_amqp_routing_key: None, + hooks_amqp_queues_prefix: uuid::Uuid::new_v4().to_string(), + hooks_amqp_connection_pool_size: 1, + hooks_amqp_channel_pool_size: 1, + hooks_amqp_idle_connection_timeout: None, + hooks_amqp_idle_channels_timeout: None, + }) .await .unwrap(); notifier.prepare().await.unwrap(); @@ -191,14 +266,7 @@ mod tests { .send_message(test_msg.clone(), hook.clone(), &HeaderMap::new()) .await .unwrap(); - let chan = notifier - .pool - .get() - .await - .unwrap() - .create_channel() - .await - .unwrap(); + let chan = notifier.channel_pool.get().await.unwrap(); let message = chan .basic_get( format!("{}.{}", notifier.queues_prefix.as_str(), hook).as_str(), @@ -220,20 +288,22 @@ mod tests { #[actix_rt::test] async fn unknown_url() { - let notifier = AMQPNotifier::new( - "http://unknown", - "test", - "test", - "topic", - None, - DeclareOptions { - declare_exchange: false, - declare_queues: false, - durable_queues: false, - durable_exchange: false, - }, - false, - ) + let notifier = AMQPNotifier::new(crate::config::AMQPHooksOptions { + hooks_amqp_url: Some(String::from("http://unknown")), + hooks_amqp_declare_exchange: true, + hooks_amqp_declare_queues: true, + hooks_amqp_durable_exchange: false, + hooks_amqp_durable_queues: false, + hooks_amqp_celery: false, + hooks_amqp_exchange: uuid::Uuid::new_v4().to_string(), + hooks_amqp_exchange_kind: String::from("topic"), + hooks_amqp_routing_key: None, + hooks_amqp_queues_prefix: uuid::Uuid::new_v4().to_string(), + hooks_amqp_connection_pool_size: 1, + hooks_amqp_channel_pool_size: 1, + hooks_amqp_idle_connection_timeout: None, + hooks_amqp_idle_channels_timeout: None, + }) .await .unwrap(); let res = notifier diff --git a/src/notifiers/models/notification_manager.rs b/src/notifiers/models/notification_manager.rs index 5326250..c9f449c 100644 --- a/src/notifiers/models/notification_manager.rs +++ b/src/notifiers/models/notification_manager.rs @@ -46,39 +46,16 @@ impl NotificationManager { ))); } #[cfg(feature = "amqp_notifier")] - if rustus_config.notification_opts.hooks_amqp_url.is_some() { + if rustus_config + .notification_opts + .amqp_hook_opts + .hooks_amqp_url + .is_some() + { debug!("Found AMQP notifier."); manager.notifiers.push(Box::new( amqp_notifier::AMQPNotifier::new( - rustus_config - .notification_opts - .hooks_amqp_url - .as_ref() - .unwrap(), - rustus_config.notification_opts.hooks_amqp_exchange.as_str(), - rustus_config - .notification_opts - .hooks_amqp_queues_prefix - .as_str(), - rustus_config - .notification_opts - .hooks_amqp_exchange_kind - .as_str(), - rustus_config - .notification_opts - .hooks_amqp_routing_key - .clone(), - amqp_notifier::DeclareOptions { - declare_exchange: rustus_config - .notification_opts - .hooks_amqp_declare_exchange, - declare_queues: rustus_config.notification_opts.hooks_amqp_declare_queues, - durable_exchange: rustus_config - .notification_opts - .hooks_amqp_durable_exchange, - durable_queues: rustus_config.notification_opts.hooks_amqp_durable_queues, - }, - rustus_config.notification_opts.hooks_amqp_celery, + rustus_config.notification_opts.amqp_hook_opts.clone(), ) .await?, ));