From 39974ba00b05c685e03e6051c2f32a524c2fc2ac Mon Sep 17 00:00:00 2001 From: Lost Date: Tue, 19 Oct 2021 18:02:40 -0400 Subject: [PATCH] rewrite frame interceptor --- .../main/kotlin/DefaultFrameInterceptor.kt | 61 +++++++++++++++ voice/src/main/kotlin/FrameInterceptor.kt | 74 +------------------ voice/src/main/kotlin/VoiceConnection.kt | 4 +- .../src/main/kotlin/VoiceConnectionBuilder.kt | 46 +++++++----- .../kotlin/handlers/UdpLifeCycleHandler.kt | 9 +-- voice/src/main/kotlin/udp/AudioFrameSender.kt | 11 +-- .../kotlin/udp/DefaultAudioFrameSender.kt | 36 ++++----- 7 files changed, 116 insertions(+), 125 deletions(-) create mode 100644 voice/src/main/kotlin/DefaultFrameInterceptor.kt diff --git a/voice/src/main/kotlin/DefaultFrameInterceptor.kt b/voice/src/main/kotlin/DefaultFrameInterceptor.kt new file mode 100644 index 000000000000..93d133d07b2e --- /dev/null +++ b/voice/src/main/kotlin/DefaultFrameInterceptor.kt @@ -0,0 +1,61 @@ +package dev.kord.voice + +import dev.kord.common.annotation.KordVoice +import dev.kord.voice.gateway.SendSpeaking +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onEach + +@KordVoice +/** + * Data that is used to configure for the lifetime of a [DefaultFrameInterceptor]. + * + * @param speakingState the [SpeakingFlags] to be sent when there audio being sent. By default, it is [microphone-only][SpeakingFlag.Microphone]. + */ +public data class DefaultFrameInterceptorData( + val speakingState: SpeakingFlags = SpeakingFlags { +SpeakingFlag.Microphone } +) + +private const val FRAMES_OF_SILENCE_TO_PLAY = 5 + +@KordVoice +/** + * The default implementation for [FrameInterceptor]. + * Any custom implementation should extend this and call the super [intercept] method, or else + * the speaking flags will not be sent! + * + * @param data the data to configure this instance with. + */ +public class DefaultFrameInterceptor(private val data: DefaultFrameInterceptorData = DefaultFrameInterceptorData()) : + FrameInterceptor { + override fun Flow.intercept(configuration: FrameInterceptorConfiguration): Flow { + var framesOfSilence = 5 + var isSpeaking = false + + suspend fun startSpeaking() { + isSpeaking = true + configuration.voiceGateway.send(SendSpeaking(data.speakingState, 0, configuration.ssrc)) + } + + suspend fun stopSpeaking() { + isSpeaking = false + configuration.voiceGateway.send(SendSpeaking(SpeakingFlags(0), 0, configuration.ssrc)) + } + + return map { frame -> + when (framesOfSilence) { + 0 -> frame + else -> frame ?: AudioFrame.SILENCE + } + }.onEach { frame -> + if (frame != null && !isSpeaking) { + startSpeaking() + } else if (frame == null) { + if (--framesOfSilence == 0) + stopSpeaking() + } else { + framesOfSilence = FRAMES_OF_SILENCE_TO_PLAY + } + } + } +} \ No newline at end of file diff --git a/voice/src/main/kotlin/FrameInterceptor.kt b/voice/src/main/kotlin/FrameInterceptor.kt index 8b5874345e48..f279d56ba47d 100644 --- a/voice/src/main/kotlin/FrameInterceptor.kt +++ b/voice/src/main/kotlin/FrameInterceptor.kt @@ -2,35 +2,16 @@ package dev.kord.voice import dev.kord.common.annotation.KordVoice import dev.kord.gateway.Gateway -import dev.kord.voice.gateway.SendSpeaking import dev.kord.voice.gateway.VoiceGateway -import kotlin.properties.Delegates +import kotlinx.coroutines.flow.Flow -/** - * Variables that are accessible to any FrameInterceptor through the [VoiceConnection.frameInterceptorFactory]. - * - * @param gateway the gateway that handles the guild this voice connection is connected to. - * @param voiceGateway the underlying [VoiceGateway]. - * @param ssrc the current SSRC retrieved from Discord. - */ @KordVoice -public data class FrameInterceptorContext( +public data class FrameInterceptorConfiguration( val gateway: Gateway, val voiceGateway: VoiceGateway, - val ssrc: UInt, + val ssrc: UInt ) -@KordVoice -public class FrameInterceptorContextBuilder(public var gateway: Gateway, public var voiceGateway: VoiceGateway) { - public var ssrc: UInt by Delegates.notNull() - - public fun build(): FrameInterceptorContext = FrameInterceptorContext(gateway, voiceGateway, ssrc) -} - -@KordVoice -internal inline fun FrameInterceptorContext(gateway: Gateway, voiceGateway: VoiceGateway, builder: FrameInterceptorContextBuilder.() -> Unit) = - FrameInterceptorContextBuilder(gateway, voiceGateway).apply(builder).build() - /** * An interceptor for audio frames before they are sent as packets. * @@ -38,52 +19,5 @@ internal inline fun FrameInterceptorContext(gateway: Gateway, voiceGateway: Voic */ @KordVoice public fun interface FrameInterceptor { - public suspend fun intercept(frame: AudioFrame?): AudioFrame? -} - -private const val FRAMES_OF_SILENCE_TO_PLAY = 5 - -/** - * The default implementation for [FrameInterceptor]. - * Any custom implementation should extend this and call the super [intercept] method, or else - * the speaking flags will not be sent! - * - * @param context the context for this interceptor. - * @param speakingState the speaking state that will be used when there is audio data to be sent. By default, it is microphone-only. - */ -@KordVoice -public open class DefaultFrameInterceptor( - protected val context: FrameInterceptorContext, - private val speakingState: SpeakingFlags = SpeakingFlags { +SpeakingFlag.Microphone } -) : FrameInterceptor { - private val voiceGateway = context.voiceGateway - - private var framesOfSilence = 5 - private var isSpeaking = false - - private val nowSpeaking = SendSpeaking(speakingState, 0, context.ssrc) - private val notSpeaking = SendSpeaking(SpeakingFlags(0), 0, context.ssrc) - - override suspend fun intercept(frame: AudioFrame?): AudioFrame? { - if (frame != null || framesOfSilence > 0) { // is there something to process - if (!isSpeaking && frame != null) { // if there is audio make sure we are speaking - isSpeaking = true - voiceGateway.send(nowSpeaking) - } - - if (frame == null) { // if we don't have audio then make sure we know that we are sending a frame of silence - if (--framesOfSilence == 0) { // we're done with frames of silence if we hit zero - isSpeaking = false - voiceGateway.send(notSpeaking) - } - } - else if (framesOfSilence != FRAMES_OF_SILENCE_TO_PLAY) { - framesOfSilence = FRAMES_OF_SILENCE_TO_PLAY // we're playing audio, lets reset the frames of silence. - } - - return frame ?: AudioFrame.SILENCE - } - - return frame - } + public fun Flow.intercept(configuration: FrameInterceptorConfiguration): Flow } \ No newline at end of file diff --git a/voice/src/main/kotlin/VoiceConnection.kt b/voice/src/main/kotlin/VoiceConnection.kt index 297be7840e76..62c7e4e6721b 100644 --- a/voice/src/main/kotlin/VoiceConnection.kt +++ b/voice/src/main/kotlin/VoiceConnection.kt @@ -39,9 +39,9 @@ public data class VoiceConnectionData( * @param data the data representing this [VoiceConnection]. * @param voiceGatewayConfiguration the configuration used on each new [connect] for the [voiceGateway]. * @param audioProvider a [AudioProvider] that will provide [AudioFrame] when required. + * @param frameInterceptor a [FrameInterceptor] that will intercept all outgoing [AudioFrame]s. * @param frameSender the [AudioFrameSender] that will handle the sending of audio packets. * @param nonceStrategy the [NonceStrategy] that is used during encryption of audio. - * @param frameInterceptorFactory a factory for [FrameInterceptor]s that is used whenever audio is ready to be sent. See [FrameInterceptor] and [DefaultFrameInterceptor]. */ @KordVoice public class VoiceConnection( @@ -52,9 +52,9 @@ public class VoiceConnection( public var voiceGatewayConfiguration: VoiceGatewayConfiguration, public val streams: Streams, public val audioProvider: AudioProvider, + public val frameInterceptor: FrameInterceptor, public val frameSender: AudioFrameSender, public val nonceStrategy: NonceStrategy, - public val frameInterceptorFactory: (FrameInterceptorContext) -> FrameInterceptor, ) { public val scope: CoroutineScope = CoroutineScope(SupervisorJob() + CoroutineName("kord-voice-connection[${data.guildId.value}]")) diff --git a/voice/src/main/kotlin/VoiceConnectionBuilder.kt b/voice/src/main/kotlin/VoiceConnectionBuilder.kt index 0b3d7ab0b76e..c6fbde89156a 100644 --- a/voice/src/main/kotlin/VoiceConnectionBuilder.kt +++ b/voice/src/main/kotlin/VoiceConnectionBuilder.kt @@ -42,6 +42,20 @@ public class VoiceConnectionBuilder( */ public var audioProvider: AudioProvider? = null + public fun audioProvider(provider: AudioProvider) { + this.audioProvider = provider + } + + /** + * The [FrameInterceptor] for this [VoiceConnection]. + * If `null`, [DefaultFrameInterceptor] will be used. + */ + public var frameInterceptor: FrameInterceptor? = null + + public fun frameInterceptor(frameInterceptor: FrameInterceptor) { + this.frameInterceptor = frameInterceptor + } + /** * The [dev.kord.voice.udp.AudioFrameSender] for this [VoiceConnection]. If null, [dev.kord.voice.udp.DefaultAudioFrameSender] * will be used. @@ -54,21 +68,6 @@ public class VoiceConnectionBuilder( */ public var nonceStrategy: NonceStrategy? = null - public fun audioProvider(provider: AudioProvider) { - this.audioProvider = provider - } - - /** - * The [FrameInterceptor] factory for this [VoiceConnection]. - * When one is not set, a factory will be used to create the default interceptor, see [DefaultFrameInterceptor]. - * This factory will be used to create a new [FrameInterceptor] whenever audio is ready to be sent. - */ - public var frameInterceptorFactory: ((FrameInterceptorContext) -> FrameInterceptor)? = null - - public fun frameInterceptor(factory: (FrameInterceptorContext) -> FrameInterceptor) { - this.frameInterceptorFactory = factory - } - /** * A boolean indicating whether your voice state will be muted. */ @@ -160,10 +159,17 @@ public class VoiceConnectionBuilder( .build() val udpSocket = udpSocket ?: GlobalVoiceUdpSocket val audioProvider = audioProvider ?: EmptyAudioPlayerProvider - val audioSender = - audioSender ?: DefaultAudioFrameSender(DefaultAudioFrameSenderData(udpSocket)) val nonceStrategy = nonceStrategy ?: LiteNonceStrategy() - val frameInterceptorFactory = frameInterceptorFactory ?: { DefaultFrameInterceptor(it) } + val frameInterceptor = frameInterceptor ?: DefaultFrameInterceptor() + val audioSender = + audioSender ?: DefaultAudioFrameSender( + DefaultAudioFrameSenderData( + udpSocket, + frameInterceptor, + audioProvider, + nonceStrategy + ) + ) val streams = streams ?: if (receiveVoice) DefaultStreams(voiceGateway, udpSocket, nonceStrategy) else NOPStreams @@ -175,9 +181,9 @@ public class VoiceConnectionBuilder( initialGatewayConfiguration, streams, audioProvider, + frameInterceptor, audioSender, - nonceStrategy, - frameInterceptorFactory, + nonceStrategy ) } diff --git a/voice/src/main/kotlin/handlers/UdpLifeCycleHandler.kt b/voice/src/main/kotlin/handlers/UdpLifeCycleHandler.kt index 5dd0bd105ebd..b8a77ae4bc8e 100644 --- a/voice/src/main/kotlin/handlers/UdpLifeCycleHandler.kt +++ b/voice/src/main/kotlin/handlers/UdpLifeCycleHandler.kt @@ -4,7 +4,7 @@ package dev.kord.voice.handlers import dev.kord.common.annotation.KordVoice import dev.kord.voice.EncryptionMode -import dev.kord.voice.FrameInterceptorContextBuilder +import dev.kord.voice.FrameInterceptorConfiguration import dev.kord.voice.VoiceConnection import dev.kord.voice.encryption.strategies.LiteNonceStrategy import dev.kord.voice.encryption.strategies.NormalNonceStrategy @@ -64,11 +64,8 @@ internal class UdpLifeCycleHandler( val config = AudioFrameSenderConfiguration( ssrc = ssrc!!, key = it.secretKey.toUByteArray().toByteArray(), - nonceStrategy = nonceStrategy, - provider = audioProvider, - baseFrameInterceptorContext = FrameInterceptorContextBuilder(gateway, voiceGateway), - interceptorFactory = frameInterceptorFactory, - server = server!! + server = server!!, + interceptorConfiguration = FrameInterceptorConfiguration(gateway, voiceGateway, ssrc!!) ) audioSenderJob?.cancel() diff --git a/voice/src/main/kotlin/udp/AudioFrameSender.kt b/voice/src/main/kotlin/udp/AudioFrameSender.kt index 0b0ad7f00176..5c3aef3ca8c8 100644 --- a/voice/src/main/kotlin/udp/AudioFrameSender.kt +++ b/voice/src/main/kotlin/udp/AudioFrameSender.kt @@ -3,11 +3,7 @@ package dev.kord.voice.udp import dev.kord.common.annotation.KordVoice -import dev.kord.voice.AudioProvider -import dev.kord.voice.FrameInterceptor -import dev.kord.voice.FrameInterceptorContext -import dev.kord.voice.FrameInterceptorContextBuilder -import dev.kord.voice.encryption.strategies.NonceStrategy +import dev.kord.voice.FrameInterceptorConfiguration import io.ktor.util.network.* @KordVoice @@ -15,10 +11,7 @@ public data class AudioFrameSenderConfiguration( val server: NetworkAddress, val ssrc: UInt, val key: ByteArray, - val nonceStrategy: NonceStrategy, - val provider: AudioProvider, - val baseFrameInterceptorContext: FrameInterceptorContextBuilder, - val interceptorFactory: (FrameInterceptorContext) -> FrameInterceptor + val interceptorConfiguration: FrameInterceptorConfiguration ) @KordVoice diff --git a/voice/src/main/kotlin/udp/DefaultAudioFrameSender.kt b/voice/src/main/kotlin/udp/DefaultAudioFrameSender.kt index 1676adac11d4..1e7a1ab25651 100644 --- a/voice/src/main/kotlin/udp/DefaultAudioFrameSender.kt +++ b/voice/src/main/kotlin/udp/DefaultAudioFrameSender.kt @@ -2,11 +2,14 @@ package dev.kord.voice.udp import dev.kord.common.annotation.KordVoice import dev.kord.voice.AudioFrame +import dev.kord.voice.AudioProvider import dev.kord.voice.FrameInterceptor +import dev.kord.voice.encryption.strategies.NonceStrategy import io.ktor.network.sockets.* import io.ktor.utils.io.core.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import mu.KotlinLogging import kotlin.random.Random @@ -15,39 +18,36 @@ private val audioFrameSenderLogger = KotlinLogging.logger { } @KordVoice public data class DefaultAudioFrameSenderData( - val udp: VoiceUdpSocket + val udp: VoiceUdpSocket, + val interceptor: FrameInterceptor, + val provider: AudioProvider, + val nonceStrategy: NonceStrategy, ) @KordVoice public class DefaultAudioFrameSender( public val data: DefaultAudioFrameSenderData ) : AudioFrameSender { - private fun createFrameInterceptor(configuration: AudioFrameSenderConfiguration): FrameInterceptor = - with(configuration) { - val builder = baseFrameInterceptorContext - builder.ssrc = ssrc - return interceptorFactory(builder.build()) // we should assume that everything else is set before-hand in the base builder - } - override suspend fun start(configuration: AudioFrameSenderConfiguration): Unit = coroutineScope { - val interceptor: FrameInterceptor = createFrameInterceptor(configuration) var sequence: UShort = Random.nextBits(UShort.SIZE_BITS).toUShort() - val packetProvider = DefaultAudioPackerProvider(configuration.key, configuration.nonceStrategy) + val packetProvider = DefaultAudioPackerProvider(configuration.key, data.nonceStrategy) val frames = Channel(Channel.RENDEZVOUS) - with(configuration.provider) { launch { provideFrames(frames) } } + with(data.provider) { launch { provideFrames(frames) } } audioFrameSenderLogger.trace { "audio poller starting." } try { - for (frame in frames) { - val consumedFrame = interceptor.intercept(frame) ?: continue - val packet = packetProvider.provide(sequence, sequence * 960u, configuration.ssrc, consumedFrame.data) - - data.udp.send(Datagram(ByteReadPacket(packet.data, packet.dataStart, packet.viewSize), configuration.server)) - - sequence++ + with(data.interceptor) { + frames.consumeAsFlow() + .intercept(configuration.interceptorConfiguration) + .filterNotNull() + .map { packetProvider.provide(sequence, sequence * 960u, configuration.ssrc, it.data) } + .map { Datagram(ByteReadPacket(it.data, it.dataStart, it.viewSize), configuration.server) } + .onEach(data.udp::send) + .onEach { sequence++ } + .collect() } } catch (e: Exception) { audioFrameSenderLogger.trace(e) { "poller stopped with reason" }