From f05b7414a0ec52404019dce9530b380d71e41f3b Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Wed, 6 Jan 2021 13:01:14 +0000 Subject: [PATCH] Songbird: Tokio 1.0 (#36) Migrates to the new version of tokio, requiring channel and sleep changes in a few locations. Additionally points to the in-tree v0.3 version of twilight. --- Cargo.toml | 17 +++++++++++------ examples/serenity/voice/Cargo.toml | 4 ++-- examples/serenity/voice_events_queue/Cargo.toml | 4 ++-- examples/serenity/voice_receive/Cargo.toml | 9 +++++---- examples/serenity/voice_receive/src/main.rs | 2 ++ examples/serenity/voice_storage/Cargo.toml | 4 ++-- examples/twilight/Cargo.toml | 10 +++++----- src/driver/connection/mod.rs | 8 +++++--- src/driver/tasks/udp_rx.rs | 8 ++++---- src/driver/tasks/udp_tx.rs | 9 +++++---- src/driver/tasks/ws.rs | 2 +- src/tracks/command.rs | 4 ++-- src/tracks/handle.rs | 11 ++++++----- src/tracks/mod.rs | 14 +++++--------- 14 files changed, 57 insertions(+), 49 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ea5bc16ff..0163a3fe0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ version = "0.1" default-features = false features = ["tokio-runtime"] optional = true -version = "0.9" +version = "0.11" [dependencies.audiopus] optional = true @@ -58,6 +58,7 @@ version = "0.8" [dependencies.serenity] optional = true +#version = "0.10" default-features = false features = ["voice", "gateway"] git = "https://github.com/serenity-rs/serenity" @@ -65,6 +66,7 @@ branch = "current" [dependencies.serenity-voice-model] optional = true +#version = "0.10" git = "https://github.com/serenity-rs/serenity" branch = "current" @@ -78,18 +80,22 @@ version = "0.1" [dependencies.tokio] optional = true -version = "0.2" +version = "1.0" default-features = false [dependencies.twilight-gateway] optional = true -version = "0.2" +#version = "0.3" default-features = false +git = "https://github.com/twilight-rs/twilight" +branch = "v0.3" [dependencies.twilight-model] optional = true -version = "0.2" +#version = "0.3" default-features = false +git = "https://github.com/twilight-rs/twilight" +branch = "v0.3" [dependencies.typemap_rev] optional = true @@ -135,13 +141,12 @@ driver = [ "serenity-voice-model", "spin_sleep", "streamcatcher", - "tokio/blocking", "tokio/fs", "tokio/io-util", "tokio/macros", "tokio/net", "tokio/process", - "tokio/rt-core", + "tokio/rt", "tokio/sync", "tokio/time", "typemap_rev", diff --git a/examples/serenity/voice/Cargo.toml b/examples/serenity/voice/Cargo.toml index 0986650a9..d6f510b8b 100644 --- a/examples/serenity/voice/Cargo.toml +++ b/examples/serenity/voice/Cargo.toml @@ -18,5 +18,5 @@ git = "https://github.com/serenity-rs/serenity" branch = "current" [dependencies.tokio] -version = "0.2" -features = ["macros"] +version = "1.0" +features = ["macros", "rt-multi-thread"] diff --git a/examples/serenity/voice_events_queue/Cargo.toml b/examples/serenity/voice_events_queue/Cargo.toml index 341063044..a646e3f9a 100644 --- a/examples/serenity/voice_events_queue/Cargo.toml +++ b/examples/serenity/voice_events_queue/Cargo.toml @@ -19,5 +19,5 @@ git = "https://github.com/serenity-rs/serenity" branch = "current" [dependencies.tokio] -version = "0.2" -features = ["macros"] +version = "1.0" +features = ["macros", "rt-multi-thread"] diff --git a/examples/serenity/voice_receive/Cargo.toml b/examples/serenity/voice_receive/Cargo.toml index 172d9b107..171d6b4ab 100644 --- a/examples/serenity/voice_receive/Cargo.toml +++ b/examples/serenity/voice_receive/Cargo.toml @@ -5,8 +5,9 @@ authors = ["my name "] edition = "2018" [dependencies] -env_logger = "~0.6" -log = "~0.4" +tracing = "0.1" +tracing-subscriber = "0.2" +tracing-futures = "0.2" [dependencies.songbird] path = "../../../" @@ -17,5 +18,5 @@ git = "https://github.com/serenity-rs/serenity" branch = "current" [dependencies.tokio] -version = "0.2" -features = ["macros"] +version = "1.0" +features = ["macros", "rt-multi-thread"] diff --git a/examples/serenity/voice_receive/src/main.rs b/examples/serenity/voice_receive/src/main.rs index f2d37918f..2249c3215 100644 --- a/examples/serenity/voice_receive/src/main.rs +++ b/examples/serenity/voice_receive/src/main.rs @@ -153,6 +153,8 @@ struct General; #[tokio::main] async fn main() { + tracing_subscriber::fmt::init(); + // Configure the client with your Discord bot token in the environment. let token = env::var("DISCORD_TOKEN") .expect("Expected a token in the environment"); diff --git a/examples/serenity/voice_storage/Cargo.toml b/examples/serenity/voice_storage/Cargo.toml index 0894e73e4..3362c99b3 100644 --- a/examples/serenity/voice_storage/Cargo.toml +++ b/examples/serenity/voice_storage/Cargo.toml @@ -18,5 +18,5 @@ git = "https://github.com/serenity-rs/serenity" branch = "current" [dependencies.tokio] -version = "0.2" -features = ["macros"] +version = "1.0" +features = ["macros", "rt-multi-thread"] diff --git a/examples/twilight/Cargo.toml b/examples/twilight/Cargo.toml index 4b47bad26..1ae206b82 100644 --- a/examples/twilight/Cargo.toml +++ b/examples/twilight/Cargo.toml @@ -9,11 +9,11 @@ futures = "0.3" tracing = "0.1" tracing-subscriber = "0.2" serde_json = { version = "1" } -tokio = { features = ["macros", "rt-threaded", "sync"], version = "0.2" } -twilight-gateway = "0.2" -twilight-http = "0.2" -twilight-model = "0.2" -twilight-standby = "0.2" +tokio = { features = ["macros", "rt-multi-thread", "sync"], version = "1" } +twilight-gateway = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" } +twilight-http = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" } +twilight-model = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" } +twilight-standby = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" } [dependencies.songbird] path = "../.." diff --git a/src/driver/connection/mod.rs b/src/driver/connection/mod.rs index b2b8190fe..3563559cc 100644 --- a/src/driver/connection/mod.rs +++ b/src/driver/connection/mod.rs @@ -18,7 +18,7 @@ use crate::{ use discortp::discord::{IpDiscoveryPacket, IpDiscoveryType, MutableIpDiscoveryPacket}; use error::{Error, Result}; use flume::Sender; -use std::{net::IpAddr, str::FromStr}; +use std::{net::IpAddr, str::FromStr, sync::Arc}; use tokio::net::UdpSocket; use tracing::{debug, info, instrument}; use url::Url; @@ -97,7 +97,7 @@ impl Connection { return Err(Error::CryptoModeUnavailable); } - let mut udp = UdpSocket::bind("0.0.0.0:0").await?; + let udp = UdpSocket::bind("0.0.0.0:0").await?; udp.connect((ready.ip, ready.port)).await?; // Follow Discord's IP Discovery procedures, in case NAT tunnelling is needed. @@ -161,7 +161,9 @@ impl Connection { let (ws_msg_tx, ws_msg_rx) = flume::unbounded(); let (udp_sender_msg_tx, udp_sender_msg_rx) = flume::unbounded(); let (udp_receiver_msg_tx, udp_receiver_msg_rx) = flume::unbounded(); - let (udp_rx, udp_tx) = udp.split(); + + let udp_rx = Arc::new(udp); + let udp_tx = Arc::clone(&udp_rx); let ssrc = ready.ssrc; diff --git a/src/driver/tasks/udp_rx.rs b/src/driver/tasks/udp_rx.rs index 2099e5d26..2ead05483 100644 --- a/src/driver/tasks/udp_rx.rs +++ b/src/driver/tasks/udp_rx.rs @@ -20,8 +20,8 @@ use discortp::{ PacketSize, }; use flume::Receiver; -use std::collections::HashMap; -use tokio::net::udp::RecvHalf; +use std::{collections::HashMap, sync::Arc}; +use tokio::net::UdpSocket; use tracing::{error, info, instrument, warn}; use xsalsa20poly1305::XSalsa20Poly1305 as Cipher; @@ -236,7 +236,7 @@ struct UdpRx { config: Config, packet_buffer: [u8; VOICE_PACKET_MAX], rx: Receiver, - udp_socket: RecvHalf, + udp_socket: Arc, } impl UdpRx { @@ -391,7 +391,7 @@ pub(crate) async fn runner( rx: Receiver, cipher: Cipher, config: Config, - udp_socket: RecvHalf, + udp_socket: Arc, ) { info!("UDP receive handle started."); diff --git a/src/driver/tasks/udp_tx.rs b/src/driver/tasks/udp_tx.rs index 7027a0970..ac3962e95 100644 --- a/src/driver/tasks/udp_tx.rs +++ b/src/driver/tasks/udp_tx.rs @@ -2,14 +2,15 @@ use super::message::*; use crate::constants::*; use discortp::discord::MutableKeepalivePacket; use flume::Receiver; +use std::sync::Arc; use tokio::{ - net::udp::SendHalf, - time::{timeout_at, Elapsed, Instant}, + net::UdpSocket, + time::{timeout_at, Instant}, }; use tracing::{error, info, instrument, trace}; #[instrument(skip(udp_msg_rx))] -pub(crate) async fn runner(udp_msg_rx: Receiver, ssrc: u32, mut udp_tx: SendHalf) { +pub(crate) async fn runner(udp_msg_rx: Receiver, ssrc: u32, udp_tx: Arc) { info!("UDP transmit handle started."); let mut keepalive_bytes = [0u8; MutableKeepalivePacket::minimum_packet_size()]; @@ -22,7 +23,7 @@ pub(crate) async fn runner(udp_msg_rx: Receiver, ssrc: u32, mut ud loop { use UdpTxMessage::*; match timeout_at(ka_time, udp_msg_rx.recv_async()).await { - Err(Elapsed { .. }) => { + Err(_) => { trace!("Sending UDP Keepalive."); if let Err(e) = udp_tx.send(&keepalive_bytes[..]).await { error!("Fatal UDP keepalive send error: {:?}.", e); diff --git a/src/driver/tasks/ws.rs b/src/driver/tasks/ws.rs index 9b407f3ea..7d8de5fb1 100644 --- a/src/driver/tasks/ws.rs +++ b/src/driver/tasks/ws.rs @@ -57,7 +57,7 @@ impl AuxNetwork { let mut ws_error = false; let mut should_reconnect = false; - let hb = time::delay_until(next_heartbeat); + let hb = time::sleep_until(next_heartbeat); tokio::select! { _ = hb => { diff --git a/src/tracks/command.rs b/src/tracks/command.rs index a80b28670..5883199e9 100644 --- a/src/tracks/command.rs +++ b/src/tracks/command.rs @@ -1,7 +1,7 @@ use super::*; use crate::events::EventData; +use flume::Sender; use std::time::Duration; -use tokio::sync::oneshot::Sender as OneshotSender; /// A request from external code using a [`TrackHandle`] to modify /// or act upon an [`Track`] object. @@ -27,7 +27,7 @@ pub enum TrackCommand { /// Run some closure on this track, with direct access to the core object. Do(Box), /// Request a read-only view of this track's state. - Request(OneshotSender>), + Request(Sender>), /// Change the loop count/strategy of this track. Loop(LoopState), /// Prompts a track's input to become live and usable, if it is not already. diff --git a/src/tracks/handle.rs b/src/tracks/handle.rs index 2800cc979..8b1a999d5 100644 --- a/src/tracks/handle.rs +++ b/src/tracks/handle.rs @@ -3,8 +3,9 @@ use crate::{ events::{Event, EventData, EventHandler}, input::Metadata, }; +use flume::Sender; use std::{fmt, sync::Arc, time::Duration}; -use tokio::sync::{mpsc::UnboundedSender, oneshot, RwLock}; +use tokio::sync::RwLock; use typemap_rev::TypeMap; use uuid::Uuid; @@ -25,7 +26,7 @@ pub struct TrackHandle { } struct InnerHandle { - command_channel: UnboundedSender, + command_channel: Sender, seekable: bool, uuid: Uuid, metadata: Box, @@ -50,7 +51,7 @@ impl TrackHandle { /// /// [`Input`]: crate::input::Input pub fn new( - command_channel: UnboundedSender, + command_channel: Sender, seekable: bool, uuid: Uuid, metadata: Box, @@ -159,10 +160,10 @@ impl TrackHandle { /// Request playback information and state from the audio context. pub async fn get_info(&self) -> TrackResult> { - let (tx, rx) = oneshot::channel(); + let (tx, rx) = flume::bounded(1); self.send(TrackCommand::Request(tx))?; - rx.await.map_err(|_| TrackError::Finished) + rx.recv_async().await.map_err(|_| TrackError::Finished) } /// Set an audio track to loop indefinitely. diff --git a/src/tracks/mod.rs b/src/tracks/mod.rs index 911729e8c..580ab57be 100644 --- a/src/tracks/mod.rs +++ b/src/tracks/mod.rs @@ -25,8 +25,8 @@ mod state; pub use self::{command::*, error::*, handle::*, looping::*, mode::*, queue::*, state::*}; use crate::{constants::*, driver::tasks::message::*, events::EventStore, input::Input}; +use flume::{Receiver, TryRecvError}; use std::time::Duration; -use tokio::sync::mpsc::{self, error::TryRecvError, UnboundedReceiver}; use uuid::Uuid; /// Control object for audio playback. @@ -102,7 +102,7 @@ pub struct Track { /// Track commands are sent in this manner to ensure that access /// occurs in a thread-safe manner, without allowing any external /// code to lock access to audio objects and block packet generation. - pub(crate) commands: UnboundedReceiver, + pub(crate) commands: Receiver, /// Handle for safe control of this audio track from other threads. /// @@ -124,11 +124,7 @@ impl Track { /// In general, you should probably use [`create_player`]. /// /// [`create_player`]: fn.create_player.html - pub fn new_raw( - source: Input, - commands: UnboundedReceiver, - handle: TrackHandle, - ) -> Self { + pub fn new_raw(source: Input, commands: Receiver, handle: TrackHandle) -> Self { let uuid = handle.uuid(); Self { @@ -310,7 +306,7 @@ impl Track { MakePlayable => self.make_playable(), } }, - Err(TryRecvError::Closed) => { + Err(TryRecvError::Disconnected) => { // this branch will never be visited. break; }, @@ -389,7 +385,7 @@ pub fn create_player(source: Input) -> (Track, TrackHandle) { /// [`Track`]: Track /// [`TrackHandle`]: TrackHandle pub fn create_player_with_uuid(source: Input, uuid: Uuid) -> (Track, TrackHandle) { - let (tx, rx) = mpsc::unbounded_channel(); + let (tx, rx) = flume::unbounded(); let can_seek = source.is_seekable(); let metadata = source.metadata.clone(); let handle = TrackHandle::new(tx, can_seek, uuid, metadata);