Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ShardManagerMonitor and other gateway cleanup #2372

Merged
merged 11 commits into from
Apr 8, 2023
6 changes: 0 additions & 6 deletions src/client/bridge/gateway/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@
use super::ShardId;
use crate::gateway::ConnectionStage;

#[allow(clippy::enum_variant_names)]
#[derive(Clone, Debug)]
pub(crate) enum ClientEvent {
ShardStageUpdate(ShardStageUpdateEvent),
}

/// An event denoting that a shard's connection stage was changed.
///
/// # Examples
Expand Down
51 changes: 1 addition & 50 deletions src/client/bridge/gateway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! ### [`ShardQueuer`]
//!
//! The shard queuer is a light wrapper around an mpsc receiver that receives
//! [`ShardManagerMessage`]s. It should be run in its own thread so it can receive messages to
//! [`ShardQueuerMessage`]s. It should be run in its own thread so it can receive messages to
//! start shards in a queue.
//!
//! Refer to [its documentation][`ShardQueuer`] for more information.
Expand All @@ -43,7 +43,6 @@
pub mod event;

mod shard_manager;
mod shard_manager_monitor;
mod shard_messenger;
mod shard_queuer;
mod shard_runner;
Expand All @@ -53,62 +52,14 @@ use std::fmt;
use std::time::Duration as StdDuration;

pub use self::shard_manager::{ShardManager, ShardManagerOptions};
pub use self::shard_manager_monitor::{ShardManagerError, ShardManagerMonitor};
pub use self::shard_messenger::ShardMessenger;
pub use self::shard_queuer::ShardQueuer;
pub use self::shard_runner::{ShardRunner, ShardRunnerOptions};
pub use self::shard_runner_message::{ChunkGuildFilter, ShardRunnerMessage};
use crate::gateway::ConnectionStage;
use crate::model::event::Event;

/// A message either for a [`ShardManager`] or a [`ShardRunner`].
#[derive(Debug)]
pub enum ShardClientMessage {
/// A message intended to be worked with by a [`ShardManager`].
Manager(ShardManagerMessage),
/// A message intended to be worked with by a [`ShardRunner`].
Runner(Box<ShardRunnerMessage>),
}

/// A message for a [`ShardManager`] relating to an operation with a shard.
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub enum ShardManagerMessage {
/// Indicator that a [`ShardManagerMonitor`] should restart a shard.
Restart(ShardId),
/// An update from a shard runner,
ShardUpdate { id: ShardId, latency: Option<StdDuration>, stage: ConnectionStage },
/// Indicator that a [`ShardManagerMonitor`] should fully shutdown a shard without bringing it
/// back up.
Shutdown(ShardId, u16),
/// Indicator that a [`ShardManagerMonitor`] should fully shutdown all shards and end its
/// monitoring process for the [`ShardManager`].
ShutdownAll,
/// Indicator that a [`ShardManager`] has initiated a shutdown, and for the component that
/// receives this to also shutdown with no further action taken.
ShutdownInitiated,
/// Indicator that a [`ShardRunner`] has finished the shutdown of a shard, allowing it to move
/// toward the next one.
ShutdownFinished(ShardId),
/// Indicator that a shard sent invalid authentication (a bad token) when identifying with the
/// gateway. Emitted when a shard receives an [`InvalidAuthentication`] Error
///
/// [`InvalidAuthentication`]: crate::gateway::GatewayError::InvalidAuthentication
ShardInvalidAuthentication,
/// Indicator that a shard provided undocumented gateway intents. Emitted when a shard received
/// an [`InvalidGatewayIntents`] error.
///
/// [`InvalidGatewayIntents`]: crate::gateway::GatewayError::InvalidGatewayIntents
ShardInvalidGatewayIntents,
/// If a connection has been established but privileged gateway intents were provided without
/// enabling them prior. Emitted when a shard received a [`DisallowedGatewayIntents`] error.
///
/// [`DisallowedGatewayIntents`]: crate::gateway::GatewayError::DisallowedGatewayIntents
ShardDisallowedGatewayIntents,
}

/// A message to be sent to the [`ShardQueuer`].
///
/// This should usually be wrapped in a [`ShardClientMessage`].
#[derive(Clone, Debug)]
pub enum ShardQueuerMessage {
/// Message to start a shard, where the 0-index element is the ID of the Shard to start and the
Expand Down
87 changes: 54 additions & 33 deletions src/client/bridge/gateway/shard_manager.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,25 @@
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Duration;

use futures::channel::mpsc::{self, UnboundedReceiver as Receiver, UnboundedSender as Sender};
use futures::StreamExt;
use futures::{SinkExt, StreamExt};
#[cfg(feature = "framework")]
use once_cell::sync::OnceCell;
use tokio::sync::{Mutex, RwLock};
use tokio::time::timeout;
use tracing::{info, instrument, warn};
use typemap_rev::TypeMap;

use super::{
ShardId,
ShardManagerMessage,
ShardManagerMonitor,
ShardQueuer,
ShardQueuerMessage,
ShardRunnerInfo,
};
use super::{ShardId, ShardQueuer, ShardQueuerMessage, ShardRunnerInfo};
#[cfg(feature = "cache")]
use crate::cache::Cache;
#[cfg(feature = "voice")]
use crate::client::bridge::voice::VoiceGatewayManager;
use crate::client::{EventHandler, RawEventHandler};
#[cfg(feature = "framework")]
use crate::framework::Framework;
use crate::gateway::PresenceData;
use crate::gateway::{ConnectionStage, GatewayError, PresenceData};
use crate::http::Http;
use crate::internal::prelude::*;
use crate::internal::tokio::spawn_named;
Expand Down Expand Up @@ -103,7 +97,7 @@ use crate::model::gateway::GatewayIntents;
/// [`Client`]: crate::Client
#[derive(Debug)]
pub struct ShardManager {
monitor_tx: Sender<ShardManagerMessage>,
return_value_tx: Sender<Result<(), GatewayError>>,
/// The shard runners currently managed.
///
/// **Note**: It is highly unrecommended to mutate this yourself unless you need to. Instead
Expand All @@ -117,30 +111,43 @@ pub struct ShardManager {
shard_total: u32,
shard_queuer: Sender<ShardQueuerMessage>,
shard_shutdown: Receiver<ShardId>,
shard_shutdown_send: Sender<ShardId>,
gateway_intents: GatewayIntents,
}

impl ShardManager {
/// Creates a new shard manager, returning both the manager and a monitor for usage in a
/// separate thread.
#[must_use]
pub fn new(opt: ShardManagerOptions) -> (Arc<Mutex<Self>>, ShardManagerMonitor) {
let (thread_tx, thread_rx) = mpsc::unbounded();
pub fn new(opt: ShardManagerOptions) -> (Arc<Mutex<Self>>, Receiver<Result<(), GatewayError>>) {
let (return_value_tx, return_value_rx) = mpsc::unbounded();
let (shard_queue_tx, shard_queue_rx) = mpsc::unbounded();

let runners = Arc::new(Mutex::new(HashMap::new()));
let (shutdown_send, shutdown_recv) = mpsc::unbounded();

let manager = Arc::new(Mutex::new(Self {
return_value_tx,
shard_index: opt.shard_index,
shard_init: opt.shard_init,
shard_queuer: shard_queue_tx,
shard_total: opt.shard_total,
shard_shutdown: shutdown_recv,
shard_shutdown_send: shutdown_send,
runners: Arc::clone(&runners),
gateway_intents: opt.intents,
}));

let mut shard_queuer = ShardQueuer {
data: opt.data,
event_handlers: opt.event_handlers,
raw_event_handlers: opt.raw_event_handlers,
#[cfg(feature = "framework")]
framework: opt.framework,
last_start: None,
manager_tx: thread_tx.clone(),
manager: Arc::clone(&manager),
queue: VecDeque::new(),
runners: Arc::clone(&runners),
runners,
rx: shard_queue_rx,
#[cfg(feature = "voice")]
voice_manager: opt.voice_manager,
Expand All @@ -156,22 +163,7 @@ impl ShardManager {
shard_queuer.run().await;
});

let manager = Arc::new(Mutex::new(Self {
monitor_tx: thread_tx,
shard_index: opt.shard_index,
shard_init: opt.shard_init,
shard_queuer: shard_queue_tx,
shard_total: opt.shard_total,
shard_shutdown: shutdown_recv,
runners,
gateway_intents: opt.intents,
}));

(Arc::clone(&manager), ShardManagerMonitor {
rx: thread_rx,
manager,
shutdown: shutdown_send,
})
(Arc::clone(&manager), return_value_rx)
}

/// Returns whether the shard manager contains either an active instance of a shard runner
Expand Down Expand Up @@ -324,7 +316,6 @@ impl ShardManager {
}

drop(self.shard_queuer.unbounded_send(ShardQueuerMessage::Shutdown));
drop(self.monitor_tx.unbounded_send(ShardManagerMessage::ShutdownInitiated));
}

#[instrument(skip(self))]
Expand All @@ -341,6 +332,37 @@ impl ShardManager {
pub fn intents(&self) -> GatewayIntents {
self.gateway_intents
}

pub async fn return_with_value(&mut self, ret: Result<(), GatewayError>) {
if let Err(e) = self.return_value_tx.send(ret).await {
tracing::warn!("failed to send return value: {}", e);
}
}

pub fn shutdown_finished(&self, id: ShardId) {
if let Err(e) = self.shard_shutdown_send.unbounded_send(id) {
tracing::warn!("failed to notify about finished shutdown: {}", e);
}
}

pub async fn restart_shard(&mut self, id: ShardId) {
self.restart(id).await;
if let Err(e) = self.shard_shutdown_send.unbounded_send(id) {
tracing::warn!("failed to notify about finished shutdown: {}", e);
}
}

pub async fn shard_update(
&self,
id: ShardId,
latency: Option<Duration>,
stage: ConnectionStage,
) {
if let Some(runner) = self.runners.lock().await.get_mut(&id) {
runner.latency = latency;
runner.stage = stage;
}
}
}

impl Drop for ShardManager {
Expand All @@ -352,7 +374,6 @@ impl Drop for ShardManager {
/// [`ShardRunner`]: super::ShardRunner
fn drop(&mut self) {
drop(self.shard_queuer.unbounded_send(ShardQueuerMessage::Shutdown));
drop(self.monitor_tx.unbounded_send(ShardManagerMessage::ShutdownInitiated));
}
}

Expand Down
112 changes: 0 additions & 112 deletions src/client/bridge/gateway/shard_manager_monitor.rs

This file was deleted.

Loading