diff --git a/server/build.gradle.kts b/server/build.gradle.kts index cbca5f9f0..cb5ff96d6 100644 --- a/server/build.gradle.kts +++ b/server/build.gradle.kts @@ -64,10 +64,9 @@ dependencies { // implementation(fileTree("lib/")) implementation(kotlin("script-runtime")) - implementation("com.expediagroup", "graphql-kotlin-server", "6.3.0") - implementation("com.expediagroup", "graphql-kotlin-schema-generator", "6.3.0") - implementation("com.graphql-java", "graphql-java-extended-scalars", "19.0") - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.5.0-RC-native-mt") + implementation("com.expediagroup:graphql-kotlin-server:6.3.0") + implementation("com.expediagroup:graphql-kotlin-schema-generator:6.3.0") + implementation("com.graphql-java:graphql-java-extended-scalars:20.0") testImplementation(libs.mockk) } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/JavalinGraphQLRequestParser.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/JavalinGraphQLRequestParser.kt index 7dc243b43..c2497cd29 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/JavalinGraphQLRequestParser.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/JavalinGraphQLRequestParser.kt @@ -14,7 +14,7 @@ import java.io.IOException class JavalinGraphQLRequestParser : GraphQLRequestParser { - @Suppress("BlockingMethodInNonBlockingContext") + @Suppress("BlockingMethodInNonBlockingContext", "PARAMETER_NAME_CHANGED_ON_OVERRIDE") override suspend fun parseRequest(context: Context): GraphQLServerRequest = try { context.bodyAsClass(GraphQLServerRequest::class.java) } catch (e: IOException) { diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLSchema.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLSchema.kt index e5de168b5..c30a1af02 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLSchema.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLSchema.kt @@ -9,7 +9,7 @@ package suwayomi.tachidesk.graphql.server import com.expediagroup.graphql.generator.SchemaGeneratorConfig import com.expediagroup.graphql.generator.TopLevelObject -import com.expediagroup.graphql.generator.hooks.SchemaGeneratorHooks +import com.expediagroup.graphql.generator.hooks.FlowSubscriptionSchemaGeneratorHooks import com.expediagroup.graphql.generator.toSchema import graphql.scalars.ExtendedScalars import graphql.schema.GraphQLType @@ -21,10 +21,10 @@ import suwayomi.tachidesk.graphql.subscriptions.DownloadSubscription import kotlin.reflect.KClass import kotlin.reflect.KType -class CustomSchemaGeneratorHooks : SchemaGeneratorHooks { +class CustomSchemaGeneratorHooks : FlowSubscriptionSchemaGeneratorHooks() { override fun willGenerateGraphQLType(type: KType): GraphQLType? = when (type.classifier as? KClass<*>) { Long::class -> ExtendedScalars.GraphQLLong - else -> null + else -> super.willGenerateGraphQLType(type) } } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLServer.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLServer.kt index 65f6a1702..47d3e29eb 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLServer.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLServer.kt @@ -15,6 +15,9 @@ import graphql.GraphQL import io.javalin.http.Context import io.javalin.websocket.WsCloseContext import io.javalin.websocket.WsMessageContext +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.map import suwayomi.tachidesk.graphql.server.subscriptions.ApolloSubscriptionProtocolHandler import suwayomi.tachidesk.graphql.server.subscriptions.GraphQLSubscriptionHandler @@ -31,7 +34,7 @@ class TachideskGraphQLServer( subscriptionProtocolHandler.handleMessage(context) .map { objectMapper.writeValueAsString(it) } .map { context.send(it) } - .subscribe() + .launchIn(GlobalScope) } fun handleSubscriptionDisconnect(context: WsCloseContext) { diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/ApolloSubscriptionProtocolHandler.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/ApolloSubscriptionProtocolHandler.kt index 53df508e3..c4d418ba5 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/ApolloSubscriptionProtocolHandler.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/ApolloSubscriptionProtocolHandler.kt @@ -13,17 +13,24 @@ import com.fasterxml.jackson.module.kotlin.convertValue import com.fasterxml.jackson.module.kotlin.readValue import io.javalin.websocket.WsContext import io.javalin.websocket.WsMessageContext -import kotlinx.coroutines.reactor.asFlux +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.sample +import kotlinx.coroutines.job import kotlinx.coroutines.runBlocking import org.slf4j.LoggerFactory -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.publisher.toFlux import suwayomi.tachidesk.graphql.server.TachideskGraphQLContextFactory import suwayomi.tachidesk.graphql.server.subscriptions.SubscriptionOperationMessage.ClientMessages.* import suwayomi.tachidesk.graphql.server.subscriptions.SubscriptionOperationMessage.ServerMessages.* import suwayomi.tachidesk.graphql.server.toGraphQLContext -import java.time.Duration /** * Implementation of the `graphql-ws` protocol defined by Apollo @@ -42,8 +49,8 @@ class ApolloSubscriptionProtocolHandler( private val acknowledgeMessage = SubscriptionOperationMessage(GQL_CONNECTION_ACK.type) @Suppress("Detekt.TooGenericExceptionCaught") - fun handleMessage(context: WsMessageContext): Flux { - val operationMessage = convertToMessageOrNull(context.message()) ?: return Flux.just(basicConnectionErrorMessage) + fun handleMessage(context: WsMessageContext): Flow { + val operationMessage = convertToMessageOrNull(context.message()) ?: return flowOf(basicConnectionErrorMessage) logger.debug("GraphQL subscription client message, sessionId=${context.sessionId} operationMessage=$operationMessage") return try { @@ -77,32 +84,34 @@ class ApolloSubscriptionProtocolHandler( * If the keep alive configuration is set, send a message back to client at every interval until the session is terminated. * Otherwise just return empty flux to append to the acknowledge message. */ - private fun getKeepAliveFlux(context: WsContext): Flux { + @OptIn(FlowPreview::class) + private fun getKeepAliveFlow(context: WsContext): Flow { val keepAliveInterval: Long? = 2000 if (keepAliveInterval != null) { - return Flux.interval(Duration.ofMillis(keepAliveInterval)) - .map { keepAliveMessage } - .doOnSubscribe { sessionState.saveKeepAliveSubscription(context, it) } + return flowOf(keepAliveMessage).sample(keepAliveInterval) + .onStart { + sessionState.saveKeepAliveSubscription(context, currentCoroutineContext().job) + } } - return Flux.empty() + return emptyFlow() } @Suppress("Detekt.TooGenericExceptionCaught") private fun startSubscription( operationMessage: SubscriptionOperationMessage, context: WsContext - ): Flux { + ): Flow { val graphQLContext = sessionState.getGraphQLContext(context) if (operationMessage.id == null) { logger.error("GraphQL subscription operation id is required") - return Flux.just(basicConnectionErrorMessage) + return flowOf(basicConnectionErrorMessage) } if (sessionState.doesOperationExist(context, operationMessage)) { logger.info("Already subscribed to operation ${operationMessage.id} for session ${context.sessionId}") - return Flux.empty() + return emptyFlow() } val payload = operationMessage.payload @@ -110,13 +119,12 @@ class ApolloSubscriptionProtocolHandler( if (payload == null) { logger.error("GraphQL subscription payload was null instead of a GraphQLRequest object") sessionState.stopOperation(context, operationMessage) - return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id)) + return flowOf(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id)) } try { val request = objectMapper.convertValue(payload) return subscriptionHandler.executeSubscription(request, graphQLContext) - .asFlux() .map { if (it.errors?.isNotEmpty() == true) { SubscriptionOperationMessage(type = GQL_ERROR.type, id = operationMessage.id, payload = it) @@ -124,22 +132,22 @@ class ApolloSubscriptionProtocolHandler( SubscriptionOperationMessage(type = GQL_DATA.type, id = operationMessage.id, payload = it) } } - .concatWith(onComplete(operationMessage, context).toFlux()) - .doOnSubscribe { sessionState.saveOperation(context, operationMessage, it) } + .onCompletion { if (it == null) emitAll(onComplete(operationMessage, context)) } + .onStart { sessionState.saveOperation(context, operationMessage, currentCoroutineContext().job) } } catch (exception: Exception) { logger.error("Error running graphql subscription", exception) // Do not terminate the session, just stop the operation messages sessionState.stopOperation(context, operationMessage) - return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id)) + return flowOf(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id)) } } - private fun onInit(operationMessage: SubscriptionOperationMessage, context: WsContext): Flux { + private fun onInit(operationMessage: SubscriptionOperationMessage, context: WsContext): Flow { saveContext(operationMessage, context) - val acknowledgeMessage = Mono.just(acknowledgeMessage) - val keepAliveFlux = getKeepAliveFlux(context) - return acknowledgeMessage.concatWith(keepAliveFlux) - .onErrorReturn(getConnectionErrorMessage(operationMessage)) + val acknowledgeMessage = flowOf(acknowledgeMessage) + val keepAliveFlux = getKeepAliveFlow(context) + return acknowledgeMessage.onCompletion { if (it == null) emitAll(keepAliveFlux) } + .catch { emit(getConnectionErrorMessage(operationMessage)) } } /** @@ -158,7 +166,7 @@ class ApolloSubscriptionProtocolHandler( private fun onComplete( operationMessage: SubscriptionOperationMessage, context: WsContext - ): Mono { + ): Flow { return sessionState.completeOperation(context, operationMessage) } @@ -168,24 +176,24 @@ class ApolloSubscriptionProtocolHandler( private fun onStop( operationMessage: SubscriptionOperationMessage, context: WsContext - ): Flux { - return sessionState.stopOperation(context, operationMessage).toFlux() + ): Flow { + return sessionState.stopOperation(context, operationMessage) } - private fun onDisconnect(context: WsContext): Flux { + private fun onDisconnect(context: WsContext): Flow { sessionState.terminateSession(context) - return Flux.empty() + return emptyFlow() } - private fun onUnknownOperation(operationMessage: SubscriptionOperationMessage, context: WsContext): Flux { + private fun onUnknownOperation(operationMessage: SubscriptionOperationMessage, context: WsContext): Flow { logger.error("Unknown subscription operation $operationMessage") sessionState.stopOperation(context, operationMessage) - return Flux.just(getConnectionErrorMessage(operationMessage)) + return flowOf(getConnectionErrorMessage(operationMessage)) } - private fun onException(exception: Exception): Flux { + private fun onException(exception: Exception): Flow { logger.error("Error parsing the subscription message", exception) - return Flux.just(basicConnectionErrorMessage) + return flowOf(basicConnectionErrorMessage) } private fun getConnectionErrorMessage(operationMessage: SubscriptionOperationMessage): SubscriptionOperationMessage { diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/ApolloSubscriptionSessionState.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/ApolloSubscriptionSessionState.kt index 7e2358ed5..f9ec6b0f2 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/ApolloSubscriptionSessionState.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/ApolloSubscriptionSessionState.kt @@ -9,8 +9,11 @@ package suwayomi.tachidesk.graphql.server.subscriptions import graphql.GraphQLContext import io.javalin.websocket.WsContext -import org.reactivestreams.Subscription -import reactor.core.publisher.Mono +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.onCompletion import suwayomi.tachidesk.graphql.server.subscriptions.SubscriptionOperationMessage.ServerMessages.GQL_COMPLETE import suwayomi.tachidesk.graphql.server.toGraphQLContext import java.util.concurrent.ConcurrentHashMap @@ -18,10 +21,10 @@ import java.util.concurrent.ConcurrentHashMap internal class ApolloSubscriptionSessionState { // Sessions are saved by web socket session id - internal val activeKeepAliveSessions = ConcurrentHashMap() + internal val activeKeepAliveSessions = ConcurrentHashMap() // Operations are saved by web socket session id, then operation id - internal val activeOperations = ConcurrentHashMap>() + internal val activeOperations = ConcurrentHashMap>() // The graphQL context is saved by web socket session id private val cachedGraphQLContext = ConcurrentHashMap() @@ -45,7 +48,7 @@ internal class ApolloSubscriptionSessionState { * This will override values without cancelling the subscription, so it is the responsibility of the consumer to cancel. * These messages will be stopped on [terminateSession]. */ - fun saveKeepAliveSubscription(context: WsContext, subscription: Subscription) { + fun saveKeepAliveSubscription(context: WsContext, subscription: Job) { activeKeepAliveSessions[context.sessionId] = subscription } @@ -54,10 +57,10 @@ internal class ApolloSubscriptionSessionState { * This will override values without cancelling the subscription so it is the responsibility of the consumer to cancel. * These messages will be stopped on [stopOperation]. */ - fun saveOperation(context: WsContext, operationMessage: SubscriptionOperationMessage, subscription: Subscription) { + fun saveOperation(context: WsContext, operationMessage: SubscriptionOperationMessage, subscription: Job) { val id = operationMessage.id if (id != null) { - val operationsForSession: ConcurrentHashMap = activeOperations.getOrPut(context.sessionId) { ConcurrentHashMap() } + val operationsForSession: ConcurrentHashMap = activeOperations.getOrPut(context.sessionId) { ConcurrentHashMap() } operationsForSession[id] = subscription } } @@ -66,26 +69,26 @@ internal class ApolloSubscriptionSessionState { * Send the [GQL_COMPLETE] message. * This can happen when the publisher finishes or if the client manually sends the stop message. */ - fun completeOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Mono { + fun completeOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Flow { return getCompleteMessage(operationMessage) - .doFinally { removeActiveOperation(context, operationMessage.id, cancelSubscription = false) } + .onCompletion { removeActiveOperation(context, operationMessage.id, cancelSubscription = false) } } /** * Stop the subscription sending data and send the [GQL_COMPLETE] message. * Does NOT terminate the session. */ - fun stopOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Mono { + fun stopOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Flow { return getCompleteMessage(operationMessage) - .doFinally { removeActiveOperation(context, operationMessage.id, cancelSubscription = true) } + .onCompletion { removeActiveOperation(context, operationMessage.id, cancelSubscription = true) } } - private fun getCompleteMessage(operationMessage: SubscriptionOperationMessage): Mono { + private fun getCompleteMessage(operationMessage: SubscriptionOperationMessage): Flow { val id = operationMessage.id if (id != null) { - return Mono.just(SubscriptionOperationMessage(type = GQL_COMPLETE.type, id = id)) + return flowOf(SubscriptionOperationMessage(type = GQL_COMPLETE.type, id = id)) } - return Mono.empty() + return emptyFlow() } /** diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/FluxSubscriptionSource.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/FlowSubscriptionSource.kt similarity index 56% rename from server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/FluxSubscriptionSource.kt rename to server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/FlowSubscriptionSource.kt index ab4c16753..16c60863c 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/FluxSubscriptionSource.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/FlowSubscriptionSource.kt @@ -7,14 +7,14 @@ package suwayomi.tachidesk.graphql.server.subscriptions -import reactor.core.publisher.Flux -import reactor.core.publisher.FluxSink +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.asSharedFlow -class FluxSubscriptionSource() { - private var sink: FluxSink? = null - val emitter: Flux = Flux.create { emitter -> sink = emitter } +class FlowSubscriptionSource { + private val mutableSharedFlow = MutableSharedFlow() + val emitter = mutableSharedFlow.asSharedFlow() fun publish(value: T) { - sink?.next(value) + mutableSharedFlow.tryEmit(value) } } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/DownloadSubscription.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/DownloadSubscription.kt index 6f1db4f64..deb9bacc1 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/DownloadSubscription.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/DownloadSubscription.kt @@ -8,15 +8,16 @@ package suwayomi.tachidesk.graphql.subscriptions import graphql.schema.DataFetchingEnvironment -import reactor.core.publisher.Flux -import suwayomi.tachidesk.graphql.server.subscriptions.FluxSubscriptionSource +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import suwayomi.tachidesk.graphql.server.subscriptions.FlowSubscriptionSource import suwayomi.tachidesk.graphql.types.DownloadType import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter -val downloadSubscriptionSource = FluxSubscriptionSource() +val downloadSubscriptionSource = FlowSubscriptionSource() class DownloadSubscription { - fun downloadChanged(dataFetchingEnvironment: DataFetchingEnvironment): Flux { + fun downloadChanged(dataFetchingEnvironment: DataFetchingEnvironment): Flow { return downloadSubscriptionSource.emitter.map { downloadChapter -> DownloadType(downloadChapter) }