Skip to content

Commit

Permalink
netty (fix): Fix async REST response handling (Scala Future, error Rx) (
Browse files Browse the repository at this point in the history
#3351)

- Provide a thread manager in NettyServerConfig
- Fix async request handling to properly catch the last response
- Fix a blocking bug when REST API returns Future[_] response type
  • Loading branch information
xerial authored Jan 23, 2024
1 parent 1c91d6a commit fa62a57
Show file tree
Hide file tree
Showing 12 changed files with 218 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class FinagleRouter(session: Session, private[finagle] val config: FinagleServer
config.controllerProvider,
FinagleBackend,
config.responseHandler,
MessageCodecFactory.defaultFactory.orElse(MessageCodecFactory.newFactory(config.customCodec))
MessageCodecFactory.defaultFactory.orElse(MessageCodecFactory.newFactory(config.customCodec)),
config.executionContext
)

override def apply(request: Request, service: Service[Request, Response]): Future[Response] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.twitter.util.{Await, Future}
import javax.annotation.PostConstruct
import wvlet.airframe.*
import wvlet.airframe.codec.MessageCodec
import wvlet.airframe.control.MultipleExceptions
import wvlet.airframe.control.{MultipleExceptions, ThreadUtil}
import wvlet.airframe.http.finagle.FinagleServer.FinagleService
import wvlet.airframe.http.finagle.filter.HttpAccessLogFilter
import wvlet.airframe.http.{
Expand All @@ -40,9 +40,10 @@ import wvlet.airframe.surface.Surface
import wvlet.log.LogSupport
import wvlet.log.io.IOUtil

import java.util.concurrent.Executors
import scala.annotation.tailrec
import scala.collection.parallel.immutable.ParVector
import scala.concurrent.ExecutionException
import scala.concurrent.{ExecutionContext, ExecutionException}
import scala.util.Try
import scala.util.control.NonFatal

Expand All @@ -62,7 +63,15 @@ case class FinagleServerConfig(
// A top-level filter applied before routing requests
beforeRoutingFilter: Filter[Request, Response, Request, Response] = Filter.identity,
// Service called when no matching route is found
fallbackService: Service[Request, Response] = FinagleServer.notFound
fallbackService: Service[Request, Response] = FinagleServer.notFound,
// Thread manager for handling Future[_] responses
executionContext: ExecutionContext = {
// Using the global thread pool causes an issue in sbt's layered class loader #918
// So need to use the local daemon thread pool
ExecutionContext.fromExecutorService(
Executors.newCachedThreadPool(ThreadUtil.newDaemonThreadFactory("airframe-finagle"))
)
}
) {
// Lazily acquire an unused port to avoid conflicts between multiple servers
lazy val port = serverPort.getOrElse(IOUtil.unusedPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import wvlet.airframe.rx.Rx
import wvlet.log.LogSupport

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}

object NettyBackend extends HttpBackend[Request, Response, Rx] with LogSupport { self =>
private val rxBackend = new RxNettyBackend
Expand All @@ -37,17 +38,18 @@ object NettyBackend extends HttpBackend[Request, Response, Rx] with LogSupport {
Rx.single(a)
}

override def toFuture[A](a: Future[A], e: ExecutionContext): Rx[A] = {
Rx.future(a)(e)
override def toFuture[A](a: Future[A], ex: ExecutionContext): Rx[A] = {
val v = Await.result(a, scala.concurrent.duration.Duration.Inf)
Rx.single(v)
}

override def toScalaFuture[A](a: Rx[A]): Future[A] = {
val promise: Promise[A] = Promise()
a.toRx
.map { x =>
promise.success(x)
}
.recover { case e: Throwable => promise.failure(e) }
val rx = a.transform {
case Success(x) => promise.success(x)
case Failure(ex) => promise.failure(ex)
}
rx.run { effect => }
promise.future
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ import wvlet.airframe.{Design, Session}
import wvlet.log.LogSupport
import wvlet.log.io.IOUtil

import java.util.concurrent.TimeUnit
import java.util.concurrent.{Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.PostConstruct
import scala.collection.immutable.ListMap
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}

case class NettyServerConfig(
Expand All @@ -52,7 +53,15 @@ case class NettyServerConfig(
new LogRotationHttpLogger(config)
},
loggingFilter: HttpLogger => RxHttpFilter = { new RPCLoggingFilter(_) },
customCodec: PartialFunction[Surface, MessageCodec[_]] = PartialFunction.empty
customCodec: PartialFunction[Surface, MessageCodec[_]] = PartialFunction.empty,
// Thread manager for handling Future[_] responses
executionContext: ExecutionContext = {
// Using the global thread pool causes an issue in sbt's layered class loader #918
// So need to use the local daemon thread pool
ExecutionContext.fromExecutorService(
Executors.newCachedThreadPool(ThreadUtil.newDaemonThreadFactory("airframe-netty"))
)
}
) {
lazy val port = serverPort.getOrElse(IOUtil.unusedPort)

Expand Down Expand Up @@ -149,7 +158,6 @@ class NettyServer(config: NettyServerConfig, session: Session) extends HttpServe
new NioEventLoopGroup(numWorkers, tf)
}
}

private var channelFuture: Option[Channel] = None

override def localAddress: String = s"localhost:${config.port}"
Expand Down Expand Up @@ -238,7 +246,8 @@ class NettyServer(config: NettyServerConfig, session: Session) extends HttpServe
NettyBackend,
new NettyResponseHandler,
// Set a custom codec and use JSON map output
MessageCodecFactory.defaultFactoryForJSON.withCodecs(config.customCodec)
MessageCodecFactory.defaultFactoryForJSON.withCodecs(config.customCodec),
config.executionContext
)
)
}
Expand Down
Loading

0 comments on commit fa62a57

Please sign in to comment.