Skip to content

Commit

Permalink
Added channels pooling for amqp hooks (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius authored Mar 28, 2023
1 parent 4ed7b3f commit 560f9d6
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 123 deletions.
6 changes: 5 additions & 1 deletion docs/hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,11 @@ Configuration parameters:
* `--hooks-amqp-durable-exchange` - adds durability to created exchange;
* `--hooks-amqp-durable-queues` - adds durability to created;
* `--hooks-amqp-celery` - adds headers required by [Celery](https://docs.celeryq.dev/en/stable/index.html);
* `--hooks-amqp-routing-key` - routing key for all messages passed to exchange.
* `--hooks-amqp-routing-key` - routing key for all messages passed to exchange;
* `--hooks-amqp-connection-pool-size` - maximum number of opened connections to RabbitMQ;
* `--hooks-amqp-channel-pool-size` - maximum number of opened channels for each connection to RabbitMQ;
* `--hooks-amqp-idle-connection-timeout` - timeout for idle connection. If the connection isn't used, it's dropped;
* `--hooks-amqp-idle-channels-timeout` - timeout for idle channels. If the channel isn't used, it's dropped.

If no hooks_amqp_routing_key specified, rustus will send all messages with
different routing keys. Named like `{prefix}.{event type}`. Eg `rustus.pre-create` and so on.
Expand Down
103 changes: 69 additions & 34 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,43 +147,10 @@ pub struct InfoStoreOptions {
)]
pub info_db_dsn: Option<String>,
}

#[derive(Parser, Debug, Clone)]
#[allow(clippy::struct_excessive_bools)]
pub struct NotificationsOptions {
/// Notifications format.
///
/// This format will be used in all
/// messages about hooks.
#[arg(long, default_value = "default", env = "RUSTUS_HOOKS_FORMAT")]
pub hooks_format: Format,

/// Enabled hooks for notifications.
#[arg(
long,
default_value = "pre-create,post-create,post-receive,pre-terminate,post-terminate,post-finish",
env = "RUSTUS_HOOKS",
use_value_delimiter = true
)]
pub hooks: Vec<Hook>,

/// Use this option if you use rustus
/// behind any proxy. Like Nginx or Traefik.
#[arg(long, env = "RUSTUS_BEHIND_PROXY")]
pub behind_proxy: bool,

/// List of URLS to send webhooks to.
#[arg(long, env = "RUSTUS_HOOKS_HTTP_URLS", use_value_delimiter = true)]
pub hooks_http_urls: Vec<String>,

// List of headers to forward from client.
#[arg(
long,
env = "RUSTUS_HOOKS_HTTP_PROXY_HEADERS",
use_value_delimiter = true
)]
pub hooks_http_proxy_headers: Vec<String>,

pub struct AMQPHooksOptions {
/// Url for AMQP server.
#[cfg(feature = "amqp_notifier")]
#[arg(long, env = "RUSTUS_HOOKS_AMQP_URL")]
Expand Down Expand Up @@ -239,6 +206,71 @@ pub struct NotificationsOptions {
)]
pub hooks_amqp_queues_prefix: String,

/// Maximum number of connections for RabbitMQ.
#[cfg(feature = "amqp_notifier")]
#[arg(
long,
env = "RUSTUS_HOOKS_AMQP_CONNECTION_POOL_SIZE",
default_value = "10"
)]
pub hooks_amqp_connection_pool_size: u32,

/// Maximum number of opened channels for each connection.
#[cfg(feature = "amqp_notifier")]
#[arg(
long,
env = "RUSTUS_HOOKS_AMQP_CHANNEL_POOL_SIZE",
default_value = "10"
)]
pub hooks_amqp_channel_pool_size: u32,

/// After this amount of time the connection will be dropped.
#[cfg(feature = "amqp_notifier")]
#[arg(long, env = "RUSTUS_HOOKS_AMQP_IDLE_CONNECTION_TIMEOUT")]
pub hooks_amqp_idle_connection_timeout: Option<u64>,

/// After this amount of time in seconds, the channel will be closed.
#[cfg(feature = "amqp_notifier")]
#[arg(long, env = "RUSTUS_HOOKS_AMQP_IDLE_CHANNELS_TIMEOUT")]
pub hooks_amqp_idle_channels_timeout: Option<u64>,
}

#[derive(Parser, Debug, Clone)]
#[allow(clippy::struct_excessive_bools)]
pub struct NotificationsOptions {
/// Notifications format.
///
/// This format will be used in all
/// messages about hooks.
#[arg(long, default_value = "default", env = "RUSTUS_HOOKS_FORMAT")]
pub hooks_format: Format,

/// Enabled hooks for notifications.
#[arg(
long,
default_value = "pre-create,post-create,post-receive,pre-terminate,post-terminate,post-finish",
env = "RUSTUS_HOOKS",
use_value_delimiter = true
)]
pub hooks: Vec<Hook>,

/// Use this option if you use rustus
/// behind any proxy. Like Nginx or Traefik.
#[arg(long, env = "RUSTUS_BEHIND_PROXY")]
pub behind_proxy: bool,

/// List of URLS to send webhooks to.
#[arg(long, env = "RUSTUS_HOOKS_HTTP_URLS", use_value_delimiter = true)]
pub hooks_http_urls: Vec<String>,

// List of headers to forward from client.
#[arg(
long,
env = "RUSTUS_HOOKS_HTTP_PROXY_HEADERS",
use_value_delimiter = true
)]
pub hooks_http_proxy_headers: Vec<String>,

/// Directory for executable hook files.
/// This parameter is used to call executables from dir.
#[arg(long, env = "RUSTUS_HOOKS_DIR")]
Expand All @@ -248,6 +280,9 @@ pub struct NotificationsOptions {
/// notifying about upload status.
#[arg(long, env = "RUSTUS_HOOKS_FILE")]
pub hooks_file: Option<String>,

#[command(flatten)]
pub amqp_hook_opts: AMQPHooksOptions,
}

#[derive(Debug, Parser, Clone)]
Expand Down
186 changes: 128 additions & 58 deletions src/notifiers/amqp_notifier.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
config::AMQPHooksOptions,
notifiers::{Hook, Notifier},
RustusResult,
};
Expand All @@ -9,8 +10,9 @@ use bb8_lapin::LapinConnectionManager;
use lapin::{
options::{BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions},
types::{AMQPValue, FieldTable, LongString},
BasicProperties, ConnectionProperties, ExchangeKind,
BasicProperties, ChannelState, ConnectionProperties, ExchangeKind,
};
use std::time::Duration;
use strum::IntoEnumIterator;

#[allow(clippy::struct_excessive_bools)]
Expand All @@ -25,36 +27,107 @@ pub struct DeclareOptions {
#[derive(Clone)]
pub struct AMQPNotifier {
exchange_name: String,
pool: Pool<LapinConnectionManager>,
channel_pool: Pool<ChannelPool>,
queues_prefix: String,
exchange_kind: String,
routing_key: Option<String>,
declare_options: DeclareOptions,
celery: bool,
}

/// Channel manager for lapin channels.
///
/// This manager is used with bb8 pool,
/// so it maintains opened channels for every connections.
///
/// This pool uses connection pool
/// and issues new connections from it.
#[derive(Clone)]
pub struct ChannelPool {
pool: Pool<LapinConnectionManager>,
}

impl ChannelPool {
pub fn new(pool: Pool<LapinConnectionManager>) -> Self {
ChannelPool { pool }
}
}

/// ManagerConnection for ChannelPool.
///
/// This manager helps you maintain opened channels.
#[async_trait::async_trait]
impl bb8::ManageConnection for ChannelPool {
type Connection = lapin::Channel;
type Error = lapin::Error;

async fn connect(&self) -> Result<Self::Connection, Self::Error> {
Ok(self
.pool
.get()
.await
.map_err(|err| match err {
bb8::RunError::TimedOut => lapin::Error::ChannelsLimitReached,
bb8::RunError::User(user_err) => user_err,
})?
.create_channel()
.await?)
}

async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
let valid_states = vec![ChannelState::Initial, ChannelState::Connected];
if valid_states.contains(&conn.status().state()) {
Ok(())
} else {
Err(lapin::Error::InvalidChannel(conn.id()))
}
}

fn has_broken(&self, conn: &mut Self::Connection) -> bool {
let broken_states = vec![ChannelState::Closed, ChannelState::Error];
broken_states.contains(&conn.status().state())
}
}

impl AMQPNotifier {
#[allow(clippy::fn_params_excessive_bools)]
pub async fn new(
amqp_url: &str,
exchange: &str,
queues_prefix: &str,
exchange_kind: &str,
routing_key: Option<String>,
declare_options: DeclareOptions,
celery: bool,
) -> RustusResult<Self> {
let manager = LapinConnectionManager::new(amqp_url, ConnectionProperties::default());
let pool = bb8::Pool::builder().build(manager).await?;
pub async fn new(options: AMQPHooksOptions) -> RustusResult<Self> {
let manager = LapinConnectionManager::new(
options.hooks_amqp_url.unwrap().as_str(),
ConnectionProperties::default(),
);
let connection_pool = bb8::Pool::builder()
.idle_timeout(
options
.hooks_amqp_idle_connection_timeout
.map(Duration::from_secs),
)
.max_size(options.hooks_amqp_connection_pool_size)
.build(manager)
.await?;
let channel_pool = bb8::Pool::builder()
.idle_timeout(
options
.hooks_amqp_idle_channels_timeout
.map(Duration::from_secs),
)
.max_size(options.hooks_amqp_channel_pool_size)
.build(ChannelPool::new(connection_pool))
.await?;

Ok(Self {
pool,
celery,
routing_key,
declare_options,
exchange_kind: exchange_kind.into(),
exchange_name: exchange.into(),
queues_prefix: queues_prefix.into(),
channel_pool,
celery: options.hooks_amqp_celery,
routing_key: options.hooks_amqp_routing_key,
declare_options: DeclareOptions {
declare_exchange: options.hooks_amqp_declare_exchange,
durable_exchange: options.hooks_amqp_durable_exchange,
declare_queues: options.hooks_amqp_declare_queues,
durable_queues: options.hooks_amqp_durable_queues,
},
exchange_kind: options.hooks_amqp_exchange_kind,
exchange_name: options.hooks_amqp_exchange,
queues_prefix: options.hooks_amqp_queues_prefix,
})
}

Expand All @@ -74,7 +147,7 @@ impl AMQPNotifier {
#[async_trait(?Send)]
impl Notifier for AMQPNotifier {
async fn prepare(&mut self) -> RustusResult<()> {
let chan = self.pool.get().await?.create_channel().await?;
let chan = self.channel_pool.get().await?;
if self.declare_options.declare_exchange {
chan.exchange_declare(
self.exchange_name.as_str(),
Expand Down Expand Up @@ -118,7 +191,7 @@ impl Notifier for AMQPNotifier {
hook: Hook,
_header_map: &HeaderMap,
) -> RustusResult<()> {
let chan = self.pool.get().await?.create_channel().await?;
let chan = self.channel_pool.get().await?;
let queue = self.get_queue_name(hook);
let routing_key = self.routing_key.as_ref().unwrap_or(&queue);
let payload = if self.celery {
Expand Down Expand Up @@ -162,20 +235,22 @@ mod tests {

async fn get_notifier() -> AMQPNotifier {
let amqp_url = std::env::var("TEST_AMQP_URL").unwrap();
let mut notifier = AMQPNotifier::new(
amqp_url.as_str(),
uuid::Uuid::new_v4().to_string().as_str(),
uuid::Uuid::new_v4().to_string().as_str(),
"topic",
None,
DeclareOptions {
declare_exchange: true,
declare_queues: true,
durable_queues: false,
durable_exchange: false,
},
true,
)
let mut notifier = AMQPNotifier::new(crate::config::AMQPHooksOptions {
hooks_amqp_url: Some(amqp_url),
hooks_amqp_declare_exchange: true,
hooks_amqp_declare_queues: true,
hooks_amqp_durable_exchange: false,
hooks_amqp_durable_queues: false,
hooks_amqp_celery: true,
hooks_amqp_exchange: uuid::Uuid::new_v4().to_string(),
hooks_amqp_exchange_kind: String::from("topic"),
hooks_amqp_routing_key: None,
hooks_amqp_queues_prefix: uuid::Uuid::new_v4().to_string(),
hooks_amqp_connection_pool_size: 1,
hooks_amqp_channel_pool_size: 1,
hooks_amqp_idle_connection_timeout: None,
hooks_amqp_idle_channels_timeout: None,
})
.await
.unwrap();
notifier.prepare().await.unwrap();
Expand All @@ -191,14 +266,7 @@ mod tests {
.send_message(test_msg.clone(), hook.clone(), &HeaderMap::new())
.await
.unwrap();
let chan = notifier
.pool
.get()
.await
.unwrap()
.create_channel()
.await
.unwrap();
let chan = notifier.channel_pool.get().await.unwrap();
let message = chan
.basic_get(
format!("{}.{}", notifier.queues_prefix.as_str(), hook).as_str(),
Expand All @@ -220,20 +288,22 @@ mod tests {

#[actix_rt::test]
async fn unknown_url() {
let notifier = AMQPNotifier::new(
"http://unknown",
"test",
"test",
"topic",
None,
DeclareOptions {
declare_exchange: false,
declare_queues: false,
durable_queues: false,
durable_exchange: false,
},
false,
)
let notifier = AMQPNotifier::new(crate::config::AMQPHooksOptions {
hooks_amqp_url: Some(String::from("http://unknown")),
hooks_amqp_declare_exchange: true,
hooks_amqp_declare_queues: true,
hooks_amqp_durable_exchange: false,
hooks_amqp_durable_queues: false,
hooks_amqp_celery: false,
hooks_amqp_exchange: uuid::Uuid::new_v4().to_string(),
hooks_amqp_exchange_kind: String::from("topic"),
hooks_amqp_routing_key: None,
hooks_amqp_queues_prefix: uuid::Uuid::new_v4().to_string(),
hooks_amqp_connection_pool_size: 1,
hooks_amqp_channel_pool_size: 1,
hooks_amqp_idle_connection_timeout: None,
hooks_amqp_idle_channels_timeout: None,
})
.await
.unwrap();
let res = notifier
Expand Down
Loading

0 comments on commit 560f9d6

Please sign in to comment.