Skip to content

Commit

Permalink
rewrite frame interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
lost-illusi0n committed Oct 21, 2021
1 parent f9ccb4c commit 39974ba
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 125 deletions.
61 changes: 61 additions & 0 deletions voice/src/main/kotlin/DefaultFrameInterceptor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package dev.kord.voice

import dev.kord.common.annotation.KordVoice
import dev.kord.voice.gateway.SendSpeaking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach

@KordVoice
/**
* Data that is used to configure for the lifetime of a [DefaultFrameInterceptor].
*
* @param speakingState the [SpeakingFlags] to be sent when there audio being sent. By default, it is [microphone-only][SpeakingFlag.Microphone].
*/
public data class DefaultFrameInterceptorData(
val speakingState: SpeakingFlags = SpeakingFlags { +SpeakingFlag.Microphone }
)

private const val FRAMES_OF_SILENCE_TO_PLAY = 5

@KordVoice
/**
* The default implementation for [FrameInterceptor].
* Any custom implementation should extend this and call the super [intercept] method, or else
* the speaking flags will not be sent!
*
* @param data the data to configure this instance with.
*/
public class DefaultFrameInterceptor(private val data: DefaultFrameInterceptorData = DefaultFrameInterceptorData()) :
FrameInterceptor {
override fun Flow<AudioFrame?>.intercept(configuration: FrameInterceptorConfiguration): Flow<AudioFrame?> {
var framesOfSilence = 5
var isSpeaking = false

suspend fun startSpeaking() {
isSpeaking = true
configuration.voiceGateway.send(SendSpeaking(data.speakingState, 0, configuration.ssrc))
}

suspend fun stopSpeaking() {
isSpeaking = false
configuration.voiceGateway.send(SendSpeaking(SpeakingFlags(0), 0, configuration.ssrc))
}

return map { frame ->
when (framesOfSilence) {
0 -> frame
else -> frame ?: AudioFrame.SILENCE
}
}.onEach { frame ->
if (frame != null && !isSpeaking) {
startSpeaking()
} else if (frame == null) {
if (--framesOfSilence == 0)
stopSpeaking()
} else {
framesOfSilence = FRAMES_OF_SILENCE_TO_PLAY
}
}
}
}
74 changes: 4 additions & 70 deletions voice/src/main/kotlin/FrameInterceptor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,88 +2,22 @@ package dev.kord.voice

import dev.kord.common.annotation.KordVoice
import dev.kord.gateway.Gateway
import dev.kord.voice.gateway.SendSpeaking
import dev.kord.voice.gateway.VoiceGateway
import kotlin.properties.Delegates
import kotlinx.coroutines.flow.Flow

/**
* Variables that are accessible to any FrameInterceptor through the [VoiceConnection.frameInterceptorFactory].
*
* @param gateway the gateway that handles the guild this voice connection is connected to.
* @param voiceGateway the underlying [VoiceGateway].
* @param ssrc the current SSRC retrieved from Discord.
*/
@KordVoice
public data class FrameInterceptorContext(
public data class FrameInterceptorConfiguration(
val gateway: Gateway,
val voiceGateway: VoiceGateway,
val ssrc: UInt,
val ssrc: UInt
)

@KordVoice
public class FrameInterceptorContextBuilder(public var gateway: Gateway, public var voiceGateway: VoiceGateway) {
public var ssrc: UInt by Delegates.notNull()

public fun build(): FrameInterceptorContext = FrameInterceptorContext(gateway, voiceGateway, ssrc)
}

@KordVoice
internal inline fun FrameInterceptorContext(gateway: Gateway, voiceGateway: VoiceGateway, builder: FrameInterceptorContextBuilder.() -> Unit) =
FrameInterceptorContextBuilder(gateway, voiceGateway).apply(builder).build()

/**
* An interceptor for audio frames before they are sent as packets.
*
* @see DefaultFrameInterceptor
*/
@KordVoice
public fun interface FrameInterceptor {
public suspend fun intercept(frame: AudioFrame?): AudioFrame?
}

private const val FRAMES_OF_SILENCE_TO_PLAY = 5

/**
* The default implementation for [FrameInterceptor].
* Any custom implementation should extend this and call the super [intercept] method, or else
* the speaking flags will not be sent!
*
* @param context the context for this interceptor.
* @param speakingState the speaking state that will be used when there is audio data to be sent. By default, it is microphone-only.
*/
@KordVoice
public open class DefaultFrameInterceptor(
protected val context: FrameInterceptorContext,
private val speakingState: SpeakingFlags = SpeakingFlags { +SpeakingFlag.Microphone }
) : FrameInterceptor {
private val voiceGateway = context.voiceGateway

private var framesOfSilence = 5
private var isSpeaking = false

private val nowSpeaking = SendSpeaking(speakingState, 0, context.ssrc)
private val notSpeaking = SendSpeaking(SpeakingFlags(0), 0, context.ssrc)

override suspend fun intercept(frame: AudioFrame?): AudioFrame? {
if (frame != null || framesOfSilence > 0) { // is there something to process
if (!isSpeaking && frame != null) { // if there is audio make sure we are speaking
isSpeaking = true
voiceGateway.send(nowSpeaking)
}

if (frame == null) { // if we don't have audio then make sure we know that we are sending a frame of silence
if (--framesOfSilence == 0) { // we're done with frames of silence if we hit zero
isSpeaking = false
voiceGateway.send(notSpeaking)
}
}
else if (framesOfSilence != FRAMES_OF_SILENCE_TO_PLAY) {
framesOfSilence = FRAMES_OF_SILENCE_TO_PLAY // we're playing audio, lets reset the frames of silence.
}

return frame ?: AudioFrame.SILENCE
}

return frame
}
public fun Flow<AudioFrame?>.intercept(configuration: FrameInterceptorConfiguration): Flow<AudioFrame?>
}
4 changes: 2 additions & 2 deletions voice/src/main/kotlin/VoiceConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public data class VoiceConnectionData(
* @param data the data representing this [VoiceConnection].
* @param voiceGatewayConfiguration the configuration used on each new [connect] for the [voiceGateway].
* @param audioProvider a [AudioProvider] that will provide [AudioFrame] when required.
* @param frameInterceptor a [FrameInterceptor] that will intercept all outgoing [AudioFrame]s.
* @param frameSender the [AudioFrameSender] that will handle the sending of audio packets.
* @param nonceStrategy the [NonceStrategy] that is used during encryption of audio.
* @param frameInterceptorFactory a factory for [FrameInterceptor]s that is used whenever audio is ready to be sent. See [FrameInterceptor] and [DefaultFrameInterceptor].
*/
@KordVoice
public class VoiceConnection(
Expand All @@ -52,9 +52,9 @@ public class VoiceConnection(
public var voiceGatewayConfiguration: VoiceGatewayConfiguration,
public val streams: Streams,
public val audioProvider: AudioProvider,
public val frameInterceptor: FrameInterceptor,
public val frameSender: AudioFrameSender,
public val nonceStrategy: NonceStrategy,
public val frameInterceptorFactory: (FrameInterceptorContext) -> FrameInterceptor,
) {
public val scope: CoroutineScope =
CoroutineScope(SupervisorJob() + CoroutineName("kord-voice-connection[${data.guildId.value}]"))
Expand Down
46 changes: 26 additions & 20 deletions voice/src/main/kotlin/VoiceConnectionBuilder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ public class VoiceConnectionBuilder(
*/
public var audioProvider: AudioProvider? = null

public fun audioProvider(provider: AudioProvider) {
this.audioProvider = provider
}

/**
* The [FrameInterceptor] for this [VoiceConnection].
* If `null`, [DefaultFrameInterceptor] will be used.
*/
public var frameInterceptor: FrameInterceptor? = null

public fun frameInterceptor(frameInterceptor: FrameInterceptor) {
this.frameInterceptor = frameInterceptor
}

/**
* The [dev.kord.voice.udp.AudioFrameSender] for this [VoiceConnection]. If null, [dev.kord.voice.udp.DefaultAudioFrameSender]
* will be used.
Expand All @@ -54,21 +68,6 @@ public class VoiceConnectionBuilder(
*/
public var nonceStrategy: NonceStrategy? = null

public fun audioProvider(provider: AudioProvider) {
this.audioProvider = provider
}

/**
* The [FrameInterceptor] factory for this [VoiceConnection].
* When one is not set, a factory will be used to create the default interceptor, see [DefaultFrameInterceptor].
* This factory will be used to create a new [FrameInterceptor] whenever audio is ready to be sent.
*/
public var frameInterceptorFactory: ((FrameInterceptorContext) -> FrameInterceptor)? = null

public fun frameInterceptor(factory: (FrameInterceptorContext) -> FrameInterceptor) {
this.frameInterceptorFactory = factory
}

/**
* A boolean indicating whether your voice state will be muted.
*/
Expand Down Expand Up @@ -160,10 +159,17 @@ public class VoiceConnectionBuilder(
.build()
val udpSocket = udpSocket ?: GlobalVoiceUdpSocket
val audioProvider = audioProvider ?: EmptyAudioPlayerProvider
val audioSender =
audioSender ?: DefaultAudioFrameSender(DefaultAudioFrameSenderData(udpSocket))
val nonceStrategy = nonceStrategy ?: LiteNonceStrategy()
val frameInterceptorFactory = frameInterceptorFactory ?: { DefaultFrameInterceptor(it) }
val frameInterceptor = frameInterceptor ?: DefaultFrameInterceptor()
val audioSender =
audioSender ?: DefaultAudioFrameSender(
DefaultAudioFrameSenderData(
udpSocket,
frameInterceptor,
audioProvider,
nonceStrategy
)
)
val streams =
streams ?: if (receiveVoice) DefaultStreams(voiceGateway, udpSocket, nonceStrategy) else NOPStreams

Expand All @@ -175,9 +181,9 @@ public class VoiceConnectionBuilder(
initialGatewayConfiguration,
streams,
audioProvider,
frameInterceptor,
audioSender,
nonceStrategy,
frameInterceptorFactory,
nonceStrategy
)
}

Expand Down
9 changes: 3 additions & 6 deletions voice/src/main/kotlin/handlers/UdpLifeCycleHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package dev.kord.voice.handlers

import dev.kord.common.annotation.KordVoice
import dev.kord.voice.EncryptionMode
import dev.kord.voice.FrameInterceptorContextBuilder
import dev.kord.voice.FrameInterceptorConfiguration
import dev.kord.voice.VoiceConnection
import dev.kord.voice.encryption.strategies.LiteNonceStrategy
import dev.kord.voice.encryption.strategies.NormalNonceStrategy
Expand Down Expand Up @@ -64,11 +64,8 @@ internal class UdpLifeCycleHandler(
val config = AudioFrameSenderConfiguration(
ssrc = ssrc!!,
key = it.secretKey.toUByteArray().toByteArray(),
nonceStrategy = nonceStrategy,
provider = audioProvider,
baseFrameInterceptorContext = FrameInterceptorContextBuilder(gateway, voiceGateway),
interceptorFactory = frameInterceptorFactory,
server = server!!
server = server!!,
interceptorConfiguration = FrameInterceptorConfiguration(gateway, voiceGateway, ssrc!!)
)

audioSenderJob?.cancel()
Expand Down
11 changes: 2 additions & 9 deletions voice/src/main/kotlin/udp/AudioFrameSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,15 @@
package dev.kord.voice.udp

import dev.kord.common.annotation.KordVoice
import dev.kord.voice.AudioProvider
import dev.kord.voice.FrameInterceptor
import dev.kord.voice.FrameInterceptorContext
import dev.kord.voice.FrameInterceptorContextBuilder
import dev.kord.voice.encryption.strategies.NonceStrategy
import dev.kord.voice.FrameInterceptorConfiguration
import io.ktor.util.network.*

@KordVoice
public data class AudioFrameSenderConfiguration(
val server: NetworkAddress,
val ssrc: UInt,
val key: ByteArray,
val nonceStrategy: NonceStrategy,
val provider: AudioProvider,
val baseFrameInterceptorContext: FrameInterceptorContextBuilder,
val interceptorFactory: (FrameInterceptorContext) -> FrameInterceptor
val interceptorConfiguration: FrameInterceptorConfiguration
)

@KordVoice
Expand Down
36 changes: 18 additions & 18 deletions voice/src/main/kotlin/udp/DefaultAudioFrameSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package dev.kord.voice.udp

import dev.kord.common.annotation.KordVoice
import dev.kord.voice.AudioFrame
import dev.kord.voice.AudioProvider
import dev.kord.voice.FrameInterceptor
import dev.kord.voice.encryption.strategies.NonceStrategy
import io.ktor.network.sockets.*
import io.ktor.utils.io.core.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import mu.KotlinLogging
import kotlin.random.Random
Expand All @@ -15,39 +18,36 @@ private val audioFrameSenderLogger = KotlinLogging.logger { }

@KordVoice
public data class DefaultAudioFrameSenderData(
val udp: VoiceUdpSocket
val udp: VoiceUdpSocket,
val interceptor: FrameInterceptor,
val provider: AudioProvider,
val nonceStrategy: NonceStrategy,
)

@KordVoice
public class DefaultAudioFrameSender(
public val data: DefaultAudioFrameSenderData
) : AudioFrameSender {
private fun createFrameInterceptor(configuration: AudioFrameSenderConfiguration): FrameInterceptor =
with(configuration) {
val builder = baseFrameInterceptorContext
builder.ssrc = ssrc
return interceptorFactory(builder.build()) // we should assume that everything else is set before-hand in the base builder
}

override suspend fun start(configuration: AudioFrameSenderConfiguration): Unit = coroutineScope {
val interceptor: FrameInterceptor = createFrameInterceptor(configuration)
var sequence: UShort = Random.nextBits(UShort.SIZE_BITS).toUShort()

val packetProvider = DefaultAudioPackerProvider(configuration.key, configuration.nonceStrategy)
val packetProvider = DefaultAudioPackerProvider(configuration.key, data.nonceStrategy)

val frames = Channel<AudioFrame?>(Channel.RENDEZVOUS)
with(configuration.provider) { launch { provideFrames(frames) } }
with(data.provider) { launch { provideFrames(frames) } }

audioFrameSenderLogger.trace { "audio poller starting." }

try {
for (frame in frames) {
val consumedFrame = interceptor.intercept(frame) ?: continue
val packet = packetProvider.provide(sequence, sequence * 960u, configuration.ssrc, consumedFrame.data)

data.udp.send(Datagram(ByteReadPacket(packet.data, packet.dataStart, packet.viewSize), configuration.server))

sequence++
with(data.interceptor) {
frames.consumeAsFlow()
.intercept(configuration.interceptorConfiguration)
.filterNotNull()
.map { packetProvider.provide(sequence, sequence * 960u, configuration.ssrc, it.data) }
.map { Datagram(ByteReadPacket(it.data, it.dataStart, it.viewSize), configuration.server) }
.onEach(data.udp::send)
.onEach { sequence++ }
.collect()
}
} catch (e: Exception) {
audioFrameSenderLogger.trace(e) { "poller stopped with reason" }
Expand Down

0 comments on commit 39974ba

Please sign in to comment.