Skip to content

Commit

Permalink
Remove ShardManagerMonitor and other gateway cleanup (serenity-rs#2372
Browse files Browse the repository at this point in the history
)

The bulk of this commit is removing `ShardManagerMonitor`. It was just a
background task that received `ShardManagerMessage`'s and called the respective
`ShardManager` function. Now, you can just call the respective `ShardManager`
function directly.
  • Loading branch information
kangalio authored and mkrasnitski committed May 30, 2023
1 parent 65a7214 commit c09eca4
Show file tree
Hide file tree
Showing 13 changed files with 189 additions and 465 deletions.
6 changes: 0 additions & 6 deletions src/client/bridge/gateway/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@
use super::ShardId;
use crate::gateway::ConnectionStage;

#[allow(clippy::enum_variant_names)]
#[derive(Clone, Debug)]
pub(crate) enum ClientEvent {
ShardStageUpdate(ShardStageUpdateEvent),
}

/// An event denoting that a shard's connection stage was changed.
///
/// # Examples
Expand Down
51 changes: 1 addition & 50 deletions src/client/bridge/gateway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! ### [`ShardQueuer`]
//!
//! The shard queuer is a light wrapper around an mpsc receiver that receives
//! [`ShardManagerMessage`]s. It should be run in its own thread so it can receive messages to
//! [`ShardQueuerMessage`]s. It should be run in its own thread so it can receive messages to
//! start shards in a queue.
//!
//! Refer to [its documentation][`ShardQueuer`] for more information.
Expand All @@ -43,7 +43,6 @@
pub mod event;

mod shard_manager;
mod shard_manager_monitor;
mod shard_messenger;
mod shard_queuer;
mod shard_runner;
Expand All @@ -53,62 +52,14 @@ use std::fmt;
use std::time::Duration as StdDuration;

pub use self::shard_manager::{ShardManager, ShardManagerOptions};
pub use self::shard_manager_monitor::{ShardManagerError, ShardManagerMonitor};
pub use self::shard_messenger::ShardMessenger;
pub use self::shard_queuer::ShardQueuer;
pub use self::shard_runner::{ShardRunner, ShardRunnerOptions};
pub use self::shard_runner_message::{ChunkGuildFilter, ShardRunnerMessage};
use crate::gateway::ConnectionStage;
use crate::model::event::Event;

/// A message either for a [`ShardManager`] or a [`ShardRunner`].
#[derive(Debug)]
pub enum ShardClientMessage {
/// A message intended to be worked with by a [`ShardManager`].
Manager(ShardManagerMessage),
/// A message intended to be worked with by a [`ShardRunner`].
Runner(Box<ShardRunnerMessage>),
}

/// A message for a [`ShardManager`] relating to an operation with a shard.
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub enum ShardManagerMessage {
/// Indicator that a [`ShardManagerMonitor`] should restart a shard.
Restart(ShardId),
/// An update from a shard runner,
ShardUpdate { id: ShardId, latency: Option<StdDuration>, stage: ConnectionStage },
/// Indicator that a [`ShardManagerMonitor`] should fully shutdown a shard without bringing it
/// back up.
Shutdown(ShardId, u16),
/// Indicator that a [`ShardManagerMonitor`] should fully shutdown all shards and end its
/// monitoring process for the [`ShardManager`].
ShutdownAll,
/// Indicator that a [`ShardManager`] has initiated a shutdown, and for the component that
/// receives this to also shutdown with no further action taken.
ShutdownInitiated,
/// Indicator that a [`ShardRunner`] has finished the shutdown of a shard, allowing it to move
/// toward the next one.
ShutdownFinished(ShardId),
/// Indicator that a shard sent invalid authentication (a bad token) when identifying with the
/// gateway. Emitted when a shard receives an [`InvalidAuthentication`] Error
///
/// [`InvalidAuthentication`]: crate::gateway::GatewayError::InvalidAuthentication
ShardInvalidAuthentication,
/// Indicator that a shard provided undocumented gateway intents. Emitted when a shard received
/// an [`InvalidGatewayIntents`] error.
///
/// [`InvalidGatewayIntents`]: crate::gateway::GatewayError::InvalidGatewayIntents
ShardInvalidGatewayIntents,
/// If a connection has been established but privileged gateway intents were provided without
/// enabling them prior. Emitted when a shard received a [`DisallowedGatewayIntents`] error.
///
/// [`DisallowedGatewayIntents`]: crate::gateway::GatewayError::DisallowedGatewayIntents
ShardDisallowedGatewayIntents,
}

/// A message to be sent to the [`ShardQueuer`].
///
/// This should usually be wrapped in a [`ShardClientMessage`].
#[derive(Clone, Debug)]
pub enum ShardQueuerMessage {
/// Message to start a shard, where the 0-index element is the ID of the Shard to start and the
Expand Down
87 changes: 54 additions & 33 deletions src/client/bridge/gateway/shard_manager.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,25 @@
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Duration;

use futures::channel::mpsc::{self, UnboundedReceiver as Receiver, UnboundedSender as Sender};
use futures::StreamExt;
use futures::{SinkExt, StreamExt};
#[cfg(feature = "framework")]
use once_cell::sync::OnceCell;
use tokio::sync::{Mutex, RwLock};
use tokio::time::timeout;
use tracing::{info, instrument, warn};
use typemap_rev::TypeMap;

use super::{
ShardId,
ShardManagerMessage,
ShardManagerMonitor,
ShardQueuer,
ShardQueuerMessage,
ShardRunnerInfo,
};
use super::{ShardId, ShardQueuer, ShardQueuerMessage, ShardRunnerInfo};
#[cfg(feature = "cache")]
use crate::cache::Cache;
#[cfg(feature = "voice")]
use crate::client::bridge::voice::VoiceGatewayManager;
use crate::client::{EventHandler, RawEventHandler};
#[cfg(feature = "framework")]
use crate::framework::Framework;
use crate::gateway::PresenceData;
use crate::gateway::{ConnectionStage, GatewayError, PresenceData};
use crate::http::Http;
use crate::internal::prelude::*;
use crate::internal::tokio::spawn_named;
Expand Down Expand Up @@ -103,7 +97,7 @@ use crate::model::gateway::GatewayIntents;
/// [`Client`]: crate::Client
#[derive(Debug)]
pub struct ShardManager {
monitor_tx: Sender<ShardManagerMessage>,
return_value_tx: Sender<Result<(), GatewayError>>,
/// The shard runners currently managed.
///
/// **Note**: It is highly unrecommended to mutate this yourself unless you need to. Instead
Expand All @@ -117,30 +111,43 @@ pub struct ShardManager {
shard_total: u32,
shard_queuer: Sender<ShardQueuerMessage>,
shard_shutdown: Receiver<ShardId>,
shard_shutdown_send: Sender<ShardId>,
gateway_intents: GatewayIntents,
}

impl ShardManager {
/// Creates a new shard manager, returning both the manager and a monitor for usage in a
/// separate thread.
#[must_use]
pub fn new(opt: ShardManagerOptions) -> (Arc<Mutex<Self>>, ShardManagerMonitor) {
let (thread_tx, thread_rx) = mpsc::unbounded();
pub fn new(opt: ShardManagerOptions) -> (Arc<Mutex<Self>>, Receiver<Result<(), GatewayError>>) {
let (return_value_tx, return_value_rx) = mpsc::unbounded();
let (shard_queue_tx, shard_queue_rx) = mpsc::unbounded();

let runners = Arc::new(Mutex::new(HashMap::new()));
let (shutdown_send, shutdown_recv) = mpsc::unbounded();

let manager = Arc::new(Mutex::new(Self {
return_value_tx,
shard_index: opt.shard_index,
shard_init: opt.shard_init,
shard_queuer: shard_queue_tx,
shard_total: opt.shard_total,
shard_shutdown: shutdown_recv,
shard_shutdown_send: shutdown_send,
runners: Arc::clone(&runners),
gateway_intents: opt.intents,
}));

let mut shard_queuer = ShardQueuer {
data: opt.data,
event_handlers: opt.event_handlers,
raw_event_handlers: opt.raw_event_handlers,
#[cfg(feature = "framework")]
framework: opt.framework,
last_start: None,
manager_tx: thread_tx.clone(),
manager: Arc::clone(&manager),
queue: VecDeque::new(),
runners: Arc::clone(&runners),
runners,
rx: shard_queue_rx,
#[cfg(feature = "voice")]
voice_manager: opt.voice_manager,
Expand All @@ -156,22 +163,7 @@ impl ShardManager {
shard_queuer.run().await;
});

let manager = Arc::new(Mutex::new(Self {
monitor_tx: thread_tx,
shard_index: opt.shard_index,
shard_init: opt.shard_init,
shard_queuer: shard_queue_tx,
shard_total: opt.shard_total,
shard_shutdown: shutdown_recv,
runners,
gateway_intents: opt.intents,
}));

(Arc::clone(&manager), ShardManagerMonitor {
rx: thread_rx,
manager,
shutdown: shutdown_send,
})
(Arc::clone(&manager), return_value_rx)
}

/// Returns whether the shard manager contains either an active instance of a shard runner
Expand Down Expand Up @@ -324,7 +316,6 @@ impl ShardManager {
}

drop(self.shard_queuer.unbounded_send(ShardQueuerMessage::Shutdown));
drop(self.monitor_tx.unbounded_send(ShardManagerMessage::ShutdownInitiated));
}

#[instrument(skip(self))]
Expand All @@ -341,6 +332,37 @@ impl ShardManager {
pub fn intents(&self) -> GatewayIntents {
self.gateway_intents
}

pub async fn return_with_value(&mut self, ret: Result<(), GatewayError>) {
if let Err(e) = self.return_value_tx.send(ret).await {
tracing::warn!("failed to send return value: {}", e);
}
}

pub fn shutdown_finished(&self, id: ShardId) {
if let Err(e) = self.shard_shutdown_send.unbounded_send(id) {
tracing::warn!("failed to notify about finished shutdown: {}", e);
}
}

pub async fn restart_shard(&mut self, id: ShardId) {
self.restart(id).await;
if let Err(e) = self.shard_shutdown_send.unbounded_send(id) {
tracing::warn!("failed to notify about finished shutdown: {}", e);
}
}

pub async fn shard_update(
&self,
id: ShardId,
latency: Option<Duration>,
stage: ConnectionStage,
) {
if let Some(runner) = self.runners.lock().await.get_mut(&id) {
runner.latency = latency;
runner.stage = stage;
}
}
}

impl Drop for ShardManager {
Expand All @@ -352,7 +374,6 @@ impl Drop for ShardManager {
/// [`ShardRunner`]: super::ShardRunner
fn drop(&mut self) {
drop(self.shard_queuer.unbounded_send(ShardQueuerMessage::Shutdown));
drop(self.monitor_tx.unbounded_send(ShardManagerMessage::ShutdownInitiated));
}
}

Expand Down
112 changes: 0 additions & 112 deletions src/client/bridge/gateway/shard_manager_monitor.rs

This file was deleted.

Loading

0 comments on commit c09eca4

Please sign in to comment.