Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate creation & usage of OxDispatcher #3759

Merged
merged 1 commit into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ case class NettySyncServer(
* server binding, to be used to control stopping of the server or obtaining metadata like port.
*/
def start()(using Ox): NettySyncServerBinding =
startUsingSocketOverride[InetSocketAddress](None, new OxDispatcher()) match
startUsingSocketOverride[InetSocketAddress](None, OxDispatcher.create) match
case (socket, stop) =>
NettySyncServerBinding(socket, stop)

Expand All @@ -87,7 +87,7 @@ case class NettySyncServer(
NettySyncServerBinding(socket, stop)

def startUsingDomainSocket(path: Path)(using Ox): NettySyncDomainSocketBinding =
startUsingSocketOverride(Some(new DomainSocketAddress(path.toFile)), new OxDispatcher()) match
startUsingSocketOverride(Some(new DomainSocketAddress(path.toFile)), OxDispatcher.create) match
case (socket, stop) =>
NettySyncDomainSocketBinding(socket, stop)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package sttp.tapir.server.netty.sync.internal.ox

import ox.*
import ox.channels.Actor
import ox.channels.{Actor, ActorRef}

import scala.util.control.NonFatal
import scala.concurrent.Future
import scala.concurrent.Promise

/** A dispatcher that can start arbitrary forks. Useful when one needs to start an asynchronous task from a thread outside of an Ox scope.
* Normally Ox doesn't allow to start forks from other threads, for example in callbacks of external libraries. If you create an
* OxDispatcher inside a scope and pass it for potential handling on another thread, that thread can call
/** A dispatcher that can start forks, within some "global" scope. Useful when one needs to start an asynchronous task from a thread outside
* of an Ox scope. Normally Ox doesn't allow to start forks from arbitrary threads, for example in callbacks of external libraries. If you
* create an `OxDispatcher` inside a scope and pass it for potential handling on another thread, that thread can call
* {{{
* dispatcher.runAsync {
* // code to be executed in a fork
Expand All @@ -19,22 +19,29 @@ import scala.concurrent.Promise
* }}}
* WARNING! Dispatchers should only be used in special cases, where the proper structure of concurrency scopes cannot be preserved. One
* such example is integration with callback-based systems like Netty, which runs handler methods on its event loop thread.
* @param ox
* concurrency scope where a fork will be run, using a nested scope to isolate failures.
*/
private[sync] class OxDispatcher()(using ox: Ox):
private class Runner:
def runAsync(thunk: Ox ?=> Unit, onError: Throwable => Unit, forkPromise: Promise[CancellableFork[Unit]]): Unit =
forkPromise
.success(forkCancellable {
try supervised(thunk)
catch case NonFatal(e) => onError(e)
})
.discard

private val actor = Actor.create(new Runner)

private[sync] class OxDispatcher private (actor: ActorRef[OxDispatcherRunner]):
def runAsync(thunk: Ox ?=> Unit)(onError: Throwable => Unit): Future[CancellableFork[Unit]] =
val forkPromise = Promise[CancellableFork[Unit]]()
actor.tell(_.runAsync(thunk, onError, forkPromise))
forkPromise.future

private trait OxDispatcherRunner:
def runAsync(thunk: Ox ?=> Unit, onError: Throwable => Unit, forkPromise: Promise[CancellableFork[Unit]]): Unit

object OxDispatcher:
/** @param ox
* concurrency scope where forks will be run, using a nested scope to isolate failures. The dispatcher will only be usable as long as
* this scope doesn't complete.
*/
def create(using ox: Ox): OxDispatcher =
val actor = Actor.create {
new OxDispatcherRunner:
def runAsync(thunk: Ox ?=> Unit, onError: Throwable => Unit, forkPromise: Promise[CancellableFork[Unit]]): Unit =
val fork = forkCancellable {
try supervised(thunk)
catch case NonFatal(e) => onError(e)
}
forkPromise.success(fork).discard
}
new OxDispatcher(actor)
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ class NettySyncTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)
override def route(es: List[ServerEndpoint[OxStreams with WebSockets, Id]], interceptors: Interceptors): IdRoute = {
val serverOptions: NettySyncServerOptions = interceptors(NettySyncServerOptions.customiseInterceptors).options
supervised { // not a correct way, but this method is only used in a few tests which don't test anything related to scopes
NettySyncServerInterpreter(serverOptions).toRoute(es, new OxDispatcher())
NettySyncServerInterpreter(serverOptions).toRoute(es, OxDispatcher.create)
}
}

def route(es: List[ServerEndpoint[OxStreams with WebSockets, Id]], interceptors: Interceptors)(using Ox): IdRoute = {
val serverOptions: NettySyncServerOptions = interceptors(NettySyncServerOptions.customiseInterceptors).options
supervised { // not a correct way, but this method is only used in a few tests which don't test anything related to scopes
NettySyncServerInterpreter(serverOptions).toRoute(es, new OxDispatcher())
NettySyncServerInterpreter(serverOptions).toRoute(es, OxDispatcher.create)
}
}

Expand Down
Loading