diff --git a/Cargo.toml b/Cargo.toml index b0198125a..526a16c45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,8 +44,8 @@ tokio-tungstenite = { optional = true, version = "0.18" } tokio-util = { features = ["io"], optional = true, version = "0.7" } tracing = { version = "0.1", features = ["log"] } tracing-futures = "0.2" -twilight-gateway = { default-features = false, optional = true, version = "0.14.0" } -twilight-model = { default-features = false, optional = true, version = "0.14.0" } +twilight-gateway = { default-features = false, optional = true, version = "0.15.0" } +twilight-model = { default-features = false, optional = true, version = "0.15.0" } typemap_rev = { optional = true, version = "0.3" } url = { optional = true, version = "2" } uuid = { features = ["v4"], optional = true, version = "1" } diff --git a/examples/twilight/Cargo.toml b/examples/twilight/Cargo.toml index efb2bfd61..b5f662b63 100644 --- a/examples/twilight/Cargo.toml +++ b/examples/twilight/Cargo.toml @@ -11,10 +11,10 @@ symphonia = { features = ["aac", "mp3", "isomp4", "alac"], version = "0.5.2" } tracing = "0.1" tracing-subscriber = "0.2" tokio = { features = ["macros", "rt-multi-thread", "sync"], version = "1" } -twilight-gateway = "0.14" -twilight-http = "0.14" -twilight-model = "0.14" -twilight-standby = "0.14" +twilight-gateway = "0.15" +twilight-http = "0.15" +twilight-model = "0.15" +twilight-standby = "0.15" [dependencies.songbird] default-features = false diff --git a/examples/twilight/src/main.rs b/examples/twilight/src/main.rs index 067aea0ce..82b4ad7c6 100644 --- a/examples/twilight/src/main.rs +++ b/examples/twilight/src/main.rs @@ -23,12 +23,18 @@ use futures::StreamExt; use songbird::{ input::{Compose, YoutubeDl}, + shards::TwilightMap, tracks::{PlayMode, TrackHandle}, Songbird, }; use std::{collections::HashMap, env, error::Error, future::Future, num::NonZeroU64, sync::Arc}; use tokio::sync::RwLock; -use twilight_gateway::{Cluster, Event, Intents}; +use twilight_gateway::{ + stream::{self, ShardEventStream}, + Event, + Intents, + Shard, +}; use twilight_http::Client as HttpClient; use twilight_model::{ channel::Message, @@ -62,21 +68,32 @@ async fn main() -> Result<(), Box> { // Initialize the tracing subscriber. tracing_subscriber::fmt::init(); - let (mut events, state) = { + let (mut shards, state) = { let token = env::var("DISCORD_TOKEN")?; let http = HttpClient::new(token.clone()); let user_id = http.current_user().await?.model().await?.id; let intents = - Intents::GUILD_MESSAGES | Intents::MESSAGE_CONTENT | Intents::GUILD_VOICE_STATES; - let (cluster, events) = Cluster::new(token, intents).await?; - cluster.up().await; + Intents::GUILD_MESSAGES | Intents::GUILD_VOICE_STATES | Intents::MESSAGE_CONTENT; + let config = twilight_gateway::Config::new(token.clone(), intents); + + let shards: Vec = + stream::create_recommended(&http, config, |_, builder| builder.build()) + .await? + .collect(); + + let senders = TwilightMap::new( + shards + .iter() + .map(|s| (s.id().number(), s.sender())) + .collect(), + ); - let songbird = Songbird::twilight(Arc::new(cluster), user_id); + let songbird = Songbird::twilight(Arc::new(senders), user_id); ( - events, + shards, Arc::new(StateRef { http, trackdata: Default::default(), @@ -86,7 +103,22 @@ async fn main() -> Result<(), Box> { ) }; - while let Some((_, event)) = events.next().await { + let mut stream = ShardEventStream::new(shards.iter_mut()); + loop { + let event = match stream.next().await { + Some((_, Ok(event))) => event, + Some((_, Err(source))) => { + tracing::warn!(?source, "error receiving event"); + + if source.is_fatal() { + break; + } + + continue; + }, + None => break, + }; + state.standby.process(&event); state.songbird.process(&event).await; diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 4045978b6..229fc22f6 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -30,6 +30,8 @@ pub use mix_mode::MixMode; #[cfg(test)] pub use test_config::*; +#[cfg(feature = "builtin-queue")] +use crate::tracks; #[cfg(feature = "builtin-queue")] use crate::tracks::TrackQueue; use crate::{ diff --git a/src/error.rs b/src/error.rs index 17769224a..54a8bc864 100644 --- a/src/error.rs +++ b/src/error.rs @@ -11,7 +11,7 @@ pub use simd_json::Error as JsonError; #[cfg(feature = "gateway")] use std::{error::Error, fmt}; #[cfg(feature = "twilight")] -use twilight_gateway::{cluster::ClusterCommandError, shard::CommandError}; +use twilight_gateway::error::SendError; #[cfg(feature = "gateway")] #[derive(Debug)] @@ -50,11 +50,8 @@ pub enum JoinError { /// Serenity-specific WebSocket send error. Serenity(TrySendError), #[cfg(feature = "twilight")] - /// Twilight-specific WebSocket send error returned when using a shard cluster. - TwilightCluster(ClusterCommandError), - #[cfg(feature = "twilight")] - /// Twilight-specific WebSocket send error when explicitly using a single shard. - TwilightShard(CommandError), + /// Twilight-specific WebSocket send error when a message fails to send over websocket. + Twilight(SendError), } #[cfg(feature = "gateway")] @@ -96,9 +93,7 @@ impl fmt::Display for JoinError { #[cfg(feature = "serenity")] JoinError::Serenity(e) => e.fmt(f), #[cfg(feature = "twilight")] - JoinError::TwilightCluster(e) => e.fmt(f), - #[cfg(feature = "twilight")] - JoinError::TwilightShard(e) => e.fmt(f), + JoinError::Twilight(e) => e.fmt(f), } } } @@ -116,9 +111,7 @@ impl Error for JoinError { #[cfg(feature = "serenity")] JoinError::Serenity(e) => e.source(), #[cfg(feature = "twilight")] - JoinError::TwilightCluster(e) => e.source(), - #[cfg(feature = "twilight")] - JoinError::TwilightShard(e) => e.source(), + JoinError::Twilight(e) => e.source(), } } } @@ -131,16 +124,9 @@ impl From> for JoinError { } #[cfg(all(feature = "twilight", feature = "gateway"))] -impl From for JoinError { - fn from(e: CommandError) -> Self { - JoinError::TwilightShard(e) - } -} - -#[cfg(all(feature = "twilight", feature = "gateway"))] -impl From for JoinError { - fn from(e: ClusterCommandError) -> Self { - JoinError::TwilightCluster(e) +impl From for JoinError { + fn from(e: SendError) -> Self { + JoinError::Twilight(e) } } diff --git a/src/manager.rs b/src/manager.rs index 40d8d4151..77bc09523 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -26,10 +26,9 @@ use serenity::{ }; use std::sync::Arc; use tokio::sync::Mutex; +#[cfg(feature = "serenity")] use tracing::debug; #[cfg(feature = "twilight")] -use twilight_gateway::Cluster; -#[cfg(feature = "twilight")] use twilight_model::gateway::event::Event as TwilightEvent; #[derive(Clone, Copy, Debug)] @@ -88,7 +87,7 @@ impl Songbird { /// [`process`]. /// /// [`process`]: Songbird::process - pub fn twilight(cluster: Arc, user_id: U) -> Self + pub fn twilight(cluster: Arc, user_id: U) -> Self where U: Into, { @@ -103,17 +102,21 @@ impl Songbird { /// [`process`]. /// /// [`process`]: Songbird::process - pub fn twilight_from_config(cluster: Arc, user_id: U, config: Config) -> Self + pub fn twilight_from_config( + sender_map: Arc, + user_id: U, + config: Config, + ) -> Self where U: Into, { Self { client_data: OnceCell::with_value(ClientData { - shard_count: cluster.config().shard_scheme().total(), + shard_count: sender_map.shard_count(), user_id: user_id.into(), }), calls: DashMap::new(), - sharder: Sharder::TwilightCluster(cluster), + sharder: Sharder::Twilight(sender_map), config: config.initialise_disposer().into(), } } @@ -370,8 +373,8 @@ impl Songbird { pub async fn process(&self, event: &TwilightEvent) { match event { TwilightEvent::VoiceServerUpdate(v) => { - let id = GuildId::from(v.guild_id); - let call = self.get(id); + let guild_id = GuildId::from(v.guild_id); + let call = self.get(guild_id); if let Some(call) = call { let mut handler = call.lock().await; diff --git a/src/shards.rs b/src/shards.rs index 9185b232b..6e071367c 100644 --- a/src/shards.rs +++ b/src/shards.rs @@ -9,18 +9,49 @@ use derivative::Derivative; use futures::channel::mpsc::{TrySendError, UnboundedSender as Sender}; #[cfg(feature = "serenity")] use parking_lot::{lock_api::RwLockWriteGuard, Mutex as PMutex, RwLock as PRwLock}; +#[cfg(feature = "serenity")] use serde_json::json; #[cfg(feature = "serenity")] use serenity::gateway::InterMessage; #[cfg(feature = "serenity")] use std::result::Result as StdResult; use std::sync::Arc; +#[cfg(feature = "serenity")] use tracing::{debug, error}; #[cfg(feature = "twilight")] -use twilight_gateway::{Cluster, Shard as TwilightShard}; +use twilight_gateway::MessageSender; #[cfg(feature = "twilight")] use twilight_model::gateway::payload::outgoing::update_voice_state::UpdateVoiceState as TwilightVoiceState; +/// Map containing [`MessageSender`]s for Twilight. +/// +/// [`MessageSender`]: twilight_gateway::MessageSender +#[cfg(feature = "twilight")] +#[derive(Debug)] +pub struct TwilightMap { + map: std::collections::HashMap, +} + +#[cfg(feature = "twilight")] +impl TwilightMap { + /// Construct a map of shards and command senders to those shards. + /// + /// For correctness all shards should be in the map. + pub fn new(map: std::collections::HashMap) -> Self { + TwilightMap { map } + } + + /// Get the message sender for `shard_id`. + pub fn get(&self, shard_id: u64) -> Option<&MessageSender> { + self.map.get(&shard_id) + } + + /// Get the total number of shards in the map. + pub fn shard_count(&self) -> u64 { + self.map.len() as u64 + } +} + #[derive(Derivative)] #[derivative(Debug)] #[non_exhaustive] @@ -30,11 +61,8 @@ pub enum Sharder { /// Serenity-specific wrapper for sharder state initialised by the library. Serenity(SerenitySharder), #[cfg(feature = "twilight")] - /// Twilight-specific wrapper for sharder state initialised by the user. - TwilightCluster(Arc), - #[cfg(feature = "twilight")] - /// Twilight-specific wrapper for a single shard initialised by the user. - TwilightShard(Arc), + /// Twilight-specific wrapper for a map of command senders. + Twilight(Arc), /// A generic shard handle source. Generic(#[derivative(Debug = "ignore")] Arc), } @@ -59,9 +87,7 @@ impl Sharder { s.get_or_insert_shard_handle(shard_id as u32), )), #[cfg(feature = "twilight")] - Sharder::TwilightCluster(t) => Some(Shard::TwilightCluster(t.clone(), shard_id)), - #[cfg(feature = "twilight")] - Sharder::TwilightShard(t) => Some(Shard::TwilightShard(t.clone())), + Sharder::Twilight(t) => Some(Shard::Twilight(t.clone(), shard_id)), Sharder::Generic(src) => src.get_shard(shard_id).map(Shard::Generic), } } @@ -126,11 +152,8 @@ pub enum Shard { /// Handle to one of serenity's shard runners. Serenity(Arc), #[cfg(feature = "twilight")] - /// Handle to a twilight shard spawned from a cluster. - TwilightCluster(Arc, u64), - #[cfg(feature = "twilight")] - /// Handle to a twilight shard spawned from a cluster. - TwilightShard(Arc), + /// Handle to a map of twilight command senders. + Twilight(Arc, u64), /// Handle to a generic shard instance. Generic(#[derivative(Debug = "ignore")] Arc), } @@ -161,17 +184,13 @@ impl VoiceUpdate for Shard { Ok(()) }, #[cfg(feature = "twilight")] - Shard::TwilightCluster(handle, shard_id) => { - let channel_id = channel_id.map(|c| c.0).map(From::from); - let cmd = TwilightVoiceState::new(guild_id.0, channel_id, self_deaf, self_mute); - handle.command(*shard_id, &cmd).await?; - Ok(()) - }, - #[cfg(feature = "twilight")] - Shard::TwilightShard(handle) => { + Shard::Twilight(map, shard_id) => { let channel_id = channel_id.map(|c| c.0).map(From::from); let cmd = TwilightVoiceState::new(guild_id.0, channel_id, self_deaf, self_mute); - handle.command(&cmd).await?; + let sender = map + .get(*shard_id) + .ok_or(crate::error::JoinError::NoSender)?; + sender.command(&cmd)?; Ok(()) }, Shard::Generic(g) =>