Skip to content

Commit

Permalink
Use Kotlin Coroutines Flow instead of Project reactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Syer10 committed Mar 30, 2023
1 parent 847a5fe commit bce76bb
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 67 deletions.
7 changes: 3 additions & 4 deletions server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ dependencies {
// implementation(fileTree("lib/"))
implementation(kotlin("script-runtime"))

implementation("com.expediagroup", "graphql-kotlin-server", "6.3.0")
implementation("com.expediagroup", "graphql-kotlin-schema-generator", "6.3.0")
implementation("com.graphql-java", "graphql-java-extended-scalars", "19.0")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.5.0-RC-native-mt")
implementation("com.expediagroup:graphql-kotlin-server:6.3.0")
implementation("com.expediagroup:graphql-kotlin-schema-generator:6.3.0")
implementation("com.graphql-java:graphql-java-extended-scalars:20.0")

testImplementation(libs.mockk)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import java.io.IOException

class JavalinGraphQLRequestParser : GraphQLRequestParser<Context> {

@Suppress("BlockingMethodInNonBlockingContext")
@Suppress("BlockingMethodInNonBlockingContext", "PARAMETER_NAME_CHANGED_ON_OVERRIDE")
override suspend fun parseRequest(context: Context): GraphQLServerRequest = try {
context.bodyAsClass(GraphQLServerRequest::class.java)
} catch (e: IOException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ package suwayomi.tachidesk.graphql.server

import com.expediagroup.graphql.generator.SchemaGeneratorConfig
import com.expediagroup.graphql.generator.TopLevelObject
import com.expediagroup.graphql.generator.hooks.SchemaGeneratorHooks
import com.expediagroup.graphql.generator.hooks.FlowSubscriptionSchemaGeneratorHooks
import com.expediagroup.graphql.generator.toSchema
import graphql.scalars.ExtendedScalars
import graphql.schema.GraphQLType
Expand All @@ -21,10 +21,10 @@ import suwayomi.tachidesk.graphql.subscriptions.DownloadSubscription
import kotlin.reflect.KClass
import kotlin.reflect.KType

class CustomSchemaGeneratorHooks : SchemaGeneratorHooks {
class CustomSchemaGeneratorHooks : FlowSubscriptionSchemaGeneratorHooks() {
override fun willGenerateGraphQLType(type: KType): GraphQLType? = when (type.classifier as? KClass<*>) {
Long::class -> ExtendedScalars.GraphQLLong
else -> null
else -> super.willGenerateGraphQLType(type)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import graphql.GraphQL
import io.javalin.http.Context
import io.javalin.websocket.WsCloseContext
import io.javalin.websocket.WsMessageContext
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import suwayomi.tachidesk.graphql.server.subscriptions.ApolloSubscriptionProtocolHandler
import suwayomi.tachidesk.graphql.server.subscriptions.GraphQLSubscriptionHandler

Expand All @@ -31,7 +34,7 @@ class TachideskGraphQLServer(
subscriptionProtocolHandler.handleMessage(context)
.map { objectMapper.writeValueAsString(it) }
.map { context.send(it) }
.subscribe()
.launchIn(GlobalScope)
}

fun handleSubscriptionDisconnect(context: WsCloseContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,24 @@ import com.fasterxml.jackson.module.kotlin.convertValue
import com.fasterxml.jackson.module.kotlin.readValue
import io.javalin.websocket.WsContext
import io.javalin.websocket.WsMessageContext
import kotlinx.coroutines.reactor.asFlux
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.sample
import kotlinx.coroutines.job
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.toFlux
import suwayomi.tachidesk.graphql.server.TachideskGraphQLContextFactory
import suwayomi.tachidesk.graphql.server.subscriptions.SubscriptionOperationMessage.ClientMessages.*
import suwayomi.tachidesk.graphql.server.subscriptions.SubscriptionOperationMessage.ServerMessages.*
import suwayomi.tachidesk.graphql.server.toGraphQLContext
import java.time.Duration

/**
* Implementation of the `graphql-ws` protocol defined by Apollo
Expand All @@ -42,8 +49,8 @@ class ApolloSubscriptionProtocolHandler(
private val acknowledgeMessage = SubscriptionOperationMessage(GQL_CONNECTION_ACK.type)

@Suppress("Detekt.TooGenericExceptionCaught")
fun handleMessage(context: WsMessageContext): Flux<SubscriptionOperationMessage> {
val operationMessage = convertToMessageOrNull(context.message()) ?: return Flux.just(basicConnectionErrorMessage)
fun handleMessage(context: WsMessageContext): Flow<SubscriptionOperationMessage> {
val operationMessage = convertToMessageOrNull(context.message()) ?: return flowOf(basicConnectionErrorMessage)
logger.debug("GraphQL subscription client message, sessionId=${context.sessionId} operationMessage=$operationMessage")

return try {
Expand Down Expand Up @@ -77,69 +84,70 @@ class ApolloSubscriptionProtocolHandler(
* If the keep alive configuration is set, send a message back to client at every interval until the session is terminated.
* Otherwise just return empty flux to append to the acknowledge message.
*/
private fun getKeepAliveFlux(context: WsContext): Flux<SubscriptionOperationMessage> {
@OptIn(FlowPreview::class)
private fun getKeepAliveFlow(context: WsContext): Flow<SubscriptionOperationMessage> {
val keepAliveInterval: Long? = 2000
if (keepAliveInterval != null) {
return Flux.interval(Duration.ofMillis(keepAliveInterval))
.map { keepAliveMessage }
.doOnSubscribe { sessionState.saveKeepAliveSubscription(context, it) }
return flowOf(keepAliveMessage).sample(keepAliveInterval)
.onStart {
sessionState.saveKeepAliveSubscription(context, currentCoroutineContext().job)
}
}

return Flux.empty()
return emptyFlow()
}

@Suppress("Detekt.TooGenericExceptionCaught")
private fun startSubscription(
operationMessage: SubscriptionOperationMessage,
context: WsContext
): Flux<SubscriptionOperationMessage> {
): Flow<SubscriptionOperationMessage> {
val graphQLContext = sessionState.getGraphQLContext(context)

if (operationMessage.id == null) {
logger.error("GraphQL subscription operation id is required")
return Flux.just(basicConnectionErrorMessage)
return flowOf(basicConnectionErrorMessage)
}

if (sessionState.doesOperationExist(context, operationMessage)) {
logger.info("Already subscribed to operation ${operationMessage.id} for session ${context.sessionId}")
return Flux.empty()
return emptyFlow()
}

val payload = operationMessage.payload

if (payload == null) {
logger.error("GraphQL subscription payload was null instead of a GraphQLRequest object")
sessionState.stopOperation(context, operationMessage)
return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
return flowOf(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
}

try {
val request = objectMapper.convertValue<GraphQLRequest>(payload)
return subscriptionHandler.executeSubscription(request, graphQLContext)
.asFlux()
.map {
if (it.errors?.isNotEmpty() == true) {
SubscriptionOperationMessage(type = GQL_ERROR.type, id = operationMessage.id, payload = it)
} else {
SubscriptionOperationMessage(type = GQL_DATA.type, id = operationMessage.id, payload = it)
}
}
.concatWith(onComplete(operationMessage, context).toFlux())
.doOnSubscribe { sessionState.saveOperation(context, operationMessage, it) }
.onCompletion { if (it == null) emitAll(onComplete(operationMessage, context)) }
.onStart { sessionState.saveOperation(context, operationMessage, currentCoroutineContext().job) }
} catch (exception: Exception) {
logger.error("Error running graphql subscription", exception)
// Do not terminate the session, just stop the operation messages
sessionState.stopOperation(context, operationMessage)
return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
return flowOf(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
}
}

private fun onInit(operationMessage: SubscriptionOperationMessage, context: WsContext): Flux<SubscriptionOperationMessage> {
private fun onInit(operationMessage: SubscriptionOperationMessage, context: WsContext): Flow<SubscriptionOperationMessage> {
saveContext(operationMessage, context)
val acknowledgeMessage = Mono.just(acknowledgeMessage)
val keepAliveFlux = getKeepAliveFlux(context)
return acknowledgeMessage.concatWith(keepAliveFlux)
.onErrorReturn(getConnectionErrorMessage(operationMessage))
val acknowledgeMessage = flowOf(acknowledgeMessage)
val keepAliveFlux = getKeepAliveFlow(context)
return acknowledgeMessage.onCompletion { if (it == null) emitAll(keepAliveFlux) }
.catch { emit(getConnectionErrorMessage(operationMessage)) }
}

/**
Expand All @@ -158,7 +166,7 @@ class ApolloSubscriptionProtocolHandler(
private fun onComplete(
operationMessage: SubscriptionOperationMessage,
context: WsContext
): Mono<SubscriptionOperationMessage> {
): Flow<SubscriptionOperationMessage> {
return sessionState.completeOperation(context, operationMessage)
}

Expand All @@ -168,24 +176,24 @@ class ApolloSubscriptionProtocolHandler(
private fun onStop(
operationMessage: SubscriptionOperationMessage,
context: WsContext
): Flux<SubscriptionOperationMessage> {
return sessionState.stopOperation(context, operationMessage).toFlux()
): Flow<SubscriptionOperationMessage> {
return sessionState.stopOperation(context, operationMessage)
}

private fun onDisconnect(context: WsContext): Flux<SubscriptionOperationMessage> {
private fun onDisconnect(context: WsContext): Flow<SubscriptionOperationMessage> {
sessionState.terminateSession(context)
return Flux.empty()
return emptyFlow()
}

private fun onUnknownOperation(operationMessage: SubscriptionOperationMessage, context: WsContext): Flux<SubscriptionOperationMessage> {
private fun onUnknownOperation(operationMessage: SubscriptionOperationMessage, context: WsContext): Flow<SubscriptionOperationMessage> {
logger.error("Unknown subscription operation $operationMessage")
sessionState.stopOperation(context, operationMessage)
return Flux.just(getConnectionErrorMessage(operationMessage))
return flowOf(getConnectionErrorMessage(operationMessage))
}

private fun onException(exception: Exception): Flux<SubscriptionOperationMessage> {
private fun onException(exception: Exception): Flow<SubscriptionOperationMessage> {
logger.error("Error parsing the subscription message", exception)
return Flux.just(basicConnectionErrorMessage)
return flowOf(basicConnectionErrorMessage)
}

private fun getConnectionErrorMessage(operationMessage: SubscriptionOperationMessage): SubscriptionOperationMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,22 @@ package suwayomi.tachidesk.graphql.server.subscriptions

import graphql.GraphQLContext
import io.javalin.websocket.WsContext
import org.reactivestreams.Subscription
import reactor.core.publisher.Mono
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onCompletion
import suwayomi.tachidesk.graphql.server.subscriptions.SubscriptionOperationMessage.ServerMessages.GQL_COMPLETE
import suwayomi.tachidesk.graphql.server.toGraphQLContext
import java.util.concurrent.ConcurrentHashMap

internal class ApolloSubscriptionSessionState {

// Sessions are saved by web socket session id
internal val activeKeepAliveSessions = ConcurrentHashMap<String, Subscription>()
internal val activeKeepAliveSessions = ConcurrentHashMap<String, Job>()

// Operations are saved by web socket session id, then operation id
internal val activeOperations = ConcurrentHashMap<String, ConcurrentHashMap<String, Subscription>>()
internal val activeOperations = ConcurrentHashMap<String, ConcurrentHashMap<String, Job>>()

// The graphQL context is saved by web socket session id
private val cachedGraphQLContext = ConcurrentHashMap<String, GraphQLContext>()
Expand All @@ -45,7 +48,7 @@ internal class ApolloSubscriptionSessionState {
* This will override values without cancelling the subscription, so it is the responsibility of the consumer to cancel.
* These messages will be stopped on [terminateSession].
*/
fun saveKeepAliveSubscription(context: WsContext, subscription: Subscription) {
fun saveKeepAliveSubscription(context: WsContext, subscription: Job) {
activeKeepAliveSessions[context.sessionId] = subscription
}

Expand All @@ -54,10 +57,10 @@ internal class ApolloSubscriptionSessionState {
* This will override values without cancelling the subscription so it is the responsibility of the consumer to cancel.
* These messages will be stopped on [stopOperation].
*/
fun saveOperation(context: WsContext, operationMessage: SubscriptionOperationMessage, subscription: Subscription) {
fun saveOperation(context: WsContext, operationMessage: SubscriptionOperationMessage, subscription: Job) {
val id = operationMessage.id
if (id != null) {
val operationsForSession: ConcurrentHashMap<String, Subscription> = activeOperations.getOrPut(context.sessionId) { ConcurrentHashMap() }
val operationsForSession: ConcurrentHashMap<String, Job> = activeOperations.getOrPut(context.sessionId) { ConcurrentHashMap() }
operationsForSession[id] = subscription
}
}
Expand All @@ -66,26 +69,26 @@ internal class ApolloSubscriptionSessionState {
* Send the [GQL_COMPLETE] message.
* This can happen when the publisher finishes or if the client manually sends the stop message.
*/
fun completeOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Mono<SubscriptionOperationMessage> {
fun completeOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Flow<SubscriptionOperationMessage> {
return getCompleteMessage(operationMessage)
.doFinally { removeActiveOperation(context, operationMessage.id, cancelSubscription = false) }
.onCompletion { removeActiveOperation(context, operationMessage.id, cancelSubscription = false) }
}

/**
* Stop the subscription sending data and send the [GQL_COMPLETE] message.
* Does NOT terminate the session.
*/
fun stopOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Mono<SubscriptionOperationMessage> {
fun stopOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Flow<SubscriptionOperationMessage> {
return getCompleteMessage(operationMessage)
.doFinally { removeActiveOperation(context, operationMessage.id, cancelSubscription = true) }
.onCompletion { removeActiveOperation(context, operationMessage.id, cancelSubscription = true) }
}

private fun getCompleteMessage(operationMessage: SubscriptionOperationMessage): Mono<SubscriptionOperationMessage> {
private fun getCompleteMessage(operationMessage: SubscriptionOperationMessage): Flow<SubscriptionOperationMessage> {
val id = operationMessage.id
if (id != null) {
return Mono.just(SubscriptionOperationMessage(type = GQL_COMPLETE.type, id = id))
return flowOf(SubscriptionOperationMessage(type = GQL_COMPLETE.type, id = id))
}
return Mono.empty()
return emptyFlow()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@

package suwayomi.tachidesk.graphql.server.subscriptions

import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow

class FluxSubscriptionSource<T : Any>() {
private var sink: FluxSink<T>? = null
val emitter: Flux<T> = Flux.create<T> { emitter -> sink = emitter }
class FlowSubscriptionSource<T : Any> {
private val mutableSharedFlow = MutableSharedFlow<T>()
val emitter = mutableSharedFlow.asSharedFlow()

fun publish(value: T) {
sink?.next(value)
mutableSharedFlow.tryEmit(value)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@
package suwayomi.tachidesk.graphql.subscriptions

import graphql.schema.DataFetchingEnvironment
import reactor.core.publisher.Flux
import suwayomi.tachidesk.graphql.server.subscriptions.FluxSubscriptionSource
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import suwayomi.tachidesk.graphql.server.subscriptions.FlowSubscriptionSource
import suwayomi.tachidesk.graphql.types.DownloadType
import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter

val downloadSubscriptionSource = FluxSubscriptionSource<DownloadChapter>()
val downloadSubscriptionSource = FlowSubscriptionSource<DownloadChapter>()

class DownloadSubscription {
fun downloadChanged(dataFetchingEnvironment: DataFetchingEnvironment): Flux<DownloadType> {
fun downloadChanged(dataFetchingEnvironment: DataFetchingEnvironment): Flow<DownloadType> {
return downloadSubscriptionSource.emitter.map { downloadChapter ->
DownloadType(downloadChapter)
}
Expand Down

0 comments on commit bce76bb

Please sign in to comment.