Skip to content

Commit

Permalink
fix #1908, fix #1909
Browse files Browse the repository at this point in the history
  • Loading branch information
mathieuancelin committed May 7, 2024
1 parent e114138 commit 7f60644
Show file tree
Hide file tree
Showing 10 changed files with 321 additions and 56 deletions.
2 changes: 1 addition & 1 deletion otoroshi/app/OtoroshiLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class OtoroshiLoader extends ApplicationLoader {
components.env.beforeListening()
OtoroshiLoaderHelper.waitForReadiness(components)
components.env.afterListening()
new ReactorNettyServer(components.env).start(components.httpRequestHandler)
ReactorNettyServer.classic(components.env).start(components.httpRequestHandler)
components.application
}
}
Expand Down
4 changes: 2 additions & 2 deletions otoroshi/app/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ class Otoroshi(serverConfig: ServerConfig, configuration: Config = ConfigFactory
components.env.beforeListening()
OtoroshiLoaderHelper.waitForReadiness(components)
components.env.afterListening()
new ReactorNettyServer(components.env).start(components.httpRequestHandler)
ReactorNettyServer.classic(components.env).start(components.httpRequestHandler)
server.httpPort.get + 1
this
}
Expand All @@ -561,7 +561,7 @@ class Otoroshi(serverConfig: ServerConfig, configuration: Config = ConfigFactory
components.env.beforeListening()
OtoroshiLoaderHelper.waitForReadiness(components)
components.env.afterListening()
new ReactorNettyServer(components.env).start(components.httpRequestHandler)
ReactorNettyServer.classic(components.env).start(components.httpRequestHandler)
server.httpPort.get + 1
stopOnShutdown()
}
Expand Down
209 changes: 199 additions & 10 deletions otoroshi/app/netty/config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import otoroshi.env.Env
import otoroshi.ssl._
import otoroshi.utils.syntax.implicits._
import play.api.Configuration
import play.api.libs.json.{Format, JsError, JsResult, JsSuccess, JsValue, Json}
import reactor.netty.http.HttpDecoderSpec

import java.util.UUID
import java.util.concurrent.atomic.AtomicReference
import scala.util.{Failure, Success, Try}

case class HttpRequestParserConfig(
allowDuplicateContentLengths: Boolean,
Expand All @@ -16,14 +19,59 @@ case class HttpRequestParserConfig(
maxHeaderSize: Int,
maxInitialLineLength: Int,
maxChunkSize: Int
)
) {
def json: JsValue = HttpRequestParserConfig.format.writes(this)
}

sealed trait NativeDriver
object HttpRequestParserConfig {
lazy val default = format.reads(Json.obj()).get
val format = new Format[HttpRequestParserConfig] {
override def reads(json: JsValue): JsResult[HttpRequestParserConfig] = Try {
HttpRequestParserConfig(
allowDuplicateContentLengths = json.select("allowDuplicateContentLengths").asOpt[Boolean].getOrElse(HttpDecoderSpec.DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS),
validateHeaders = json.select("validateHeaders").asOpt[Boolean].getOrElse(HttpDecoderSpec.DEFAULT_VALIDATE_HEADERS),
h2cMaxContentLength = json.select("h2cMaxContentLength").asOpt[Int].getOrElse(65536),
initialBufferSize = json.select("initialBufferSize").asOpt[Int].getOrElse(HttpDecoderSpec.DEFAULT_INITIAL_BUFFER_SIZE),
maxHeaderSize = json.select("maxHeaderSize").asOpt[Int].getOrElse(HttpDecoderSpec.DEFAULT_MAX_HEADER_SIZE),
maxInitialLineLength = json.select("maxInitialLineLength").asOpt[Int].getOrElse(HttpDecoderSpec.DEFAULT_MAX_INITIAL_LINE_LENGTH),
maxChunkSize = json.select("maxChunkSize").asOpt[Int].getOrElse(8192),
)
} match {
case Failure(exception) => JsError(exception.getMessage)
case Success(config) => JsSuccess(config)
}
override def writes(o: HttpRequestParserConfig): JsValue = Json.obj(
"allowDuplicateContentLengths" -> o.allowDuplicateContentLengths,
"validateHeaders" -> o.validateHeaders,
"h2cMaxContentLength" -> o.h2cMaxContentLength,
"initialBufferSize" -> o.initialBufferSize,
"maxHeaderSize" -> o.maxHeaderSize,
"maxInitialLineLength" -> o.maxInitialLineLength,
"maxChunkSize" -> o.maxChunkSize,
)
}
}

sealed trait NativeDriver {
def name: String
def json: JsValue = name.json
}
object NativeDriver {
case object Auto extends NativeDriver
case object Epoll extends NativeDriver
case object KQueue extends NativeDriver
case object IOUring extends NativeDriver
case object Auto extends NativeDriver { def name: String = "Auto" }
case object Epoll extends NativeDriver { def name: String = "Epoll" }
case object KQueue extends NativeDriver { def name: String = "KQueue" }
case object IOUring extends NativeDriver { def name: String = "IOUring" }
val format = new Format[NativeDriver] {

override def reads(json: JsValue): JsResult[NativeDriver] = json.asOpt[String].map(_.toLowerCase()) match {
case Some("auto") => JsSuccess(Auto)
case Some("epoll") => JsSuccess(Epoll)
case Some("kqueue") => JsSuccess(KQueue)
case Some("iouring") => JsSuccess(IOUring)
case v => JsError(s"unsupported value: ${v}")
}
override def writes(o: NativeDriver): JsValue = o.json
}
}

case class Http3Settings(
Expand All @@ -37,16 +85,93 @@ case class Http3Settings(
initialMaxStreamDataBidirectionalRemote: Long,
initialMaxStreamsBidirectional: Long,
disableQpackDynamicTable: Boolean
)
case class Http2Settings(enabled: Boolean, h2cEnabled: Boolean)
) {
def json: JsValue = Http3Settings.format.writes(this)
}
object Http3Settings {
lazy val default = format.reads(Json.obj()).get
val format = new Format[Http3Settings] {
override def reads(json: JsValue): JsResult[Http3Settings] = Try {
Http3Settings(
enabled = json.select("enabled").asOpt[Boolean].getOrElse(false),
port = json.select("port").asOpt[Int].getOrElse(-1),
exposedPort = json.select("exposedPort").asOpt[Int].getOrElse(-1),
maxSendUdpPayloadSize = json.select("maxSendUdpPayloadSize").asOpt[Long].getOrElse(1500),
maxRecvUdpPayloadSize = json.select("maxRecvUdpPayloadSize").asOpt[Long].getOrElse(1500),
initialMaxData = json.select("initialMaxData").asOpt[Long].getOrElse(10000000),
initialMaxStreamDataBidirectionalLocal = json.select("initialMaxStreamDataBidirectionalLocal").asOpt[Long].getOrElse(10000000),
initialMaxStreamDataBidirectionalRemote = json.select("initialMaxStreamDataBidirectionalRemote").asOpt[Long].getOrElse(10000000),
initialMaxStreamsBidirectional = json.select("initialMaxStreamsBidirectional").asOpt[Long].getOrElse(10000000),
disableQpackDynamicTable = json.select("disableQpackDynamicTable").asOpt[Boolean].getOrElse(true),
)
} match {
case Failure(exception) => JsError(exception.getMessage)
case Success(config) => JsSuccess(config)
}
override def writes(o: Http3Settings): JsValue = Json.obj(
"enabled" -> o.enabled,
"port" -> o.port,
"exposedPort" -> o.exposedPort,
"maxSendUdpPayloadSize" -> o.maxSendUdpPayloadSize,
"maxRecvUdpPayloadSize" -> o.maxRecvUdpPayloadSize,
"initialMaxData" -> o.initialMaxData,
"initialMaxStreamDataBidirectionalLocal" -> o.initialMaxStreamDataBidirectionalLocal,
"initialMaxStreamDataBidirectionalRemote" -> o.initialMaxStreamDataBidirectionalRemote,
"initialMaxStreamsBidirectional" -> o.initialMaxStreamsBidirectional,
"disableQpackDynamicTable" -> o.disableQpackDynamicTable,
)
}
}
case class Http2Settings(enabled: Boolean, h2cEnabled: Boolean) {
def json: JsValue = Http2Settings.format.writes(this)
}
object Http2Settings {
lazy val default = format.reads(Json.obj()).get
val format = new Format[Http2Settings] {
override def reads(json: JsValue): JsResult[Http2Settings] = Try {
Http2Settings(
enabled = json.select("enabled").asOpt[Boolean].getOrElse(true),
h2cEnabled = json.select("h2cEnabled").asOpt[Boolean].getOrElse(true)
)
} match {
case Failure(exception) => JsError(exception.getMessage)
case Success(config) => JsSuccess(config)
}
override def writes(o: Http2Settings): JsValue = Json.obj(
"enabled" -> o.enabled,
"h2cEnabled" -> o.h2cEnabled,
)
}
}
case class NativeSettings(enabled: Boolean, driver: NativeDriver) {
def isEpoll: Boolean = enabled && (driver == NativeDriver.Auto || driver == NativeDriver.Epoll)
def isKQueue: Boolean = enabled && (driver == NativeDriver.Auto || driver == NativeDriver.KQueue)
def isIOUring: Boolean = enabled && (driver == NativeDriver.Auto || driver == NativeDriver.IOUring)
def json: JsValue = NativeSettings.format.writes(this)
}
object NativeSettings {
lazy val default = format.reads(Json.obj()).get
val format = new Format[NativeSettings] {
override def reads(json: JsValue): JsResult[NativeSettings] = Try {
NativeSettings(
enabled = json.select("enabled").asOpt[Boolean].getOrElse(true),
driver = json.select("driver").asOpt(NativeDriver.format).getOrElse(NativeDriver.Auto)
)
} match {
case Failure(exception) => JsError(exception.getMessage)
case Success(config) => JsSuccess(config)
}
override def writes(o: NativeSettings): JsValue = Json.obj(
"enabled" -> o.enabled,
"driver" -> o.driver.json,
)
}
}

case class ReactorNettyServerConfig(
id: String,
enabled: Boolean,
exclusive: Boolean,
newEngineOnly: Boolean,
host: String,
httpPort: Int,
Expand All @@ -58,8 +183,8 @@ case class ReactorNettyServerConfig(
accessLog: Boolean,
cipherSuites: Option[Seq[String]],
protocols: Option[Seq[String]],
clientAuth: ClientAuth,
idleTimeout: java.time.Duration,
clientAuth: ClientAuth,
parser: HttpRequestParserConfig,
http2: Http2Settings,
http3: Http3Settings,
Expand All @@ -77,8 +202,14 @@ object ReactorNettyServerConfig {

def _parseFrom(env: Env): ReactorNettyServerConfig = {
val config = env.configuration.get[Configuration]("otoroshi.next.experimental.netty-server")
parseFromConfig(config, env, "classic".some)
}

def parseFromConfig(config: Configuration, env: Env, maybeId: Option[String]): ReactorNettyServerConfig = {
ReactorNettyServerConfig(
id = maybeId.orElse(config.getOptionalWithFileSupport[String]("id")).getOrElse(UUID.randomUUID().toString),
enabled = config.getOptionalWithFileSupport[Boolean]("enabled").getOrElse(false),
exclusive = config.getOptionalWithFileSupport[Boolean]("exclusive").getOrElse(false),
newEngineOnly = config.getOptionalWithFileSupport[Boolean]("new-engine-only").getOrElse(false),
host = config.getOptionalWithFileSupport[String]("host").getOrElse("0.0.0.0"),
httpPort = config.getOptionalWithFileSupport[Int]("http-port").getOrElse(env.httpPort + 50),
Expand Down Expand Up @@ -130,7 +261,7 @@ object ReactorNettyServerConfig {
.getOrElse(HttpDecoderSpec.DEFAULT_MAX_INITIAL_LINE_LENGTH),
maxChunkSize = config
.getOptionalWithFileSupport[Int]("parser.maxChunkSize")
.getOrElse(HttpDecoderSpec.DEFAULT_MAX_CHUNK_SIZE)
.getOrElse(8192)
),
http2 = Http2Settings(
enabled = config.getOptionalWithFileSupport[Boolean]("http2.enabled").getOrElse(true),
Expand Down Expand Up @@ -172,6 +303,64 @@ object ReactorNettyServerConfig {
)
)
}

val format = new Format[ReactorNettyServerConfig] {

override def reads(json: JsValue): JsResult[ReactorNettyServerConfig] = {
Try {
ReactorNettyServerConfig(
id = json.select("id").as[String],
enabled = json.select("enabled").asOpt[Boolean].getOrElse(false),
exclusive = json.select("exclusive").asOpt[Boolean].getOrElse(false),
newEngineOnly = json.select("new_engine_only").asOpt[Boolean].getOrElse(true),
host = json.select("host").asOpt[String].getOrElse("0.0.0.0"),
httpPort = json.select("http_port").asOpt[Int].getOrElse(-1),
exposedHttpPort = json.select("exposed_http_port").asOpt[Int].getOrElse(-1),
httpsPort = json.select("https_port").asOpt[Int].getOrElse(-1),
exposedHttpsPort = json.select("exposed_https_port").asOpt[Int].getOrElse(-1),
nThread = json.select("n_thread").asOpt[Int].getOrElse(0),
wiretap = json.select("wiretap").asOpt[Boolean].getOrElse(false),
accessLog = json.select("access_log").asOpt[Boolean].getOrElse(false),
cipherSuites = json.select("cipher_suites").asOpt[Seq[String]],
protocols = json.select("protocols").asOpt[Seq[String]],
idleTimeout = json.select("idle_timeout").asOpt[Long].map(v => java.time.Duration.ofMillis(v)).getOrElse(java.time.Duration.ofMillis(60000)),
clientAuth = json.select("client_auth").asOpt[String].flatMap(ClientAuth.apply).getOrElse(ClientAuth.None),
parser = json.select("parser").asOpt(HttpRequestParserConfig.format).getOrElse(HttpRequestParserConfig.default),
http2 = json.select("http_2").asOpt(Http2Settings.format).getOrElse(Http2Settings.default),
http3 = json.select("http_3").asOpt(Http3Settings.format).getOrElse(Http3Settings.default),
native = json.select("native").asOpt(NativeSettings.format).getOrElse(NativeSettings.default),
)
} match {
case Failure(exception) => JsError(exception.getMessage)
case Success(config) => JsSuccess(config)
}
}

override def writes(o: ReactorNettyServerConfig): JsValue = {
Json.obj(
"id" -> o.id,
"enabled" -> o.enabled,
"exclusive" -> o.exclusive,
"new_engine_only" -> o.newEngineOnly,
"host" -> o.host,
"http_port" -> o.httpPort,
"exposed_http_port" -> o.exposedHttpPort,
"https_port" -> o.httpsPort,
"exposed_https_port" -> o.exposedHttpsPort,
"n_thread" -> o.nThread,
"wiretap" -> o.wiretap,
"access_log" -> o.accessLog,
"cipher_suites" -> o.cipherSuites,
"protocols" -> o.protocols,
"idle_timeout" -> o.idleTimeout,
"client_auth" -> o.clientAuth.name,
"parser" -> o.parser.json,
"http_2" -> o.http2.json,
"http_3" -> o.http3.json,
"native" -> o.native.json,
)
}
}
}

case class NettyClientConfig(
Expand Down
18 changes: 14 additions & 4 deletions otoroshi/app/netty/http3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class Http1RequestHandler(
}
log_protocol = session.map(_.getProtocol).flatMap(TlsVersion.parseSafe).map(_.name).getOrElse("-")
val rawOtoReq =
new NettyRequest(req, ctx, Flux.empty(), true, session, sessionCookieBaker, flashCookieBaker, addressGet)
new NettyRequest(config.id, req, ctx, Flux.empty(), true, session, sessionCookieBaker, flashCookieBaker, addressGet)
val hasBody = otoroshi.utils.body.BodyUtils.hasBodyWithoutOrZeroLength(rawOtoReq)._1
val bodyIn: Flux[ByteString] = if (hasBody) hotFlux else Flux.empty()
val otoReq = rawOtoReq.withBody(bodyIn)
Expand Down Expand Up @@ -570,9 +570,9 @@ class NettyHttp3Server(config: ReactorNettyServerConfig, env: Env) {
handler: HttpRequestHandler,
sessionCookieBaker: SessionCookieBaker,
flashCookieBaker: FlashCookieBaker
): Unit = {
): DisposableNettyHttp3Server = {

if (config.http3.enabled) {
if (config.http3.enabled && config.http3.port != -1) {

import io.netty.bootstrap._
import io.netty.channel._
Expand Down Expand Up @@ -694,9 +694,19 @@ class NettyHttp3Server(config: ReactorNettyServerConfig, env: Env) {
.sync()
.channel()
channel.closeFuture()
val disposableServer = DisposableNettyHttp3Server(group.some)
Runtime.getRuntime.addShutdownHook(new Thread(() => {
group.shutdownGracefully();
disposableServer.stop()
}))
disposableServer
} else {
DisposableNettyHttp3Server(None)
}
}
}

case class DisposableNettyHttp3Server(group: Option[NioEventLoopGroup]) {
def stop(): Unit = {
group.foreach(_.shutdownGracefully())
}
}
Loading

0 comments on commit 7f60644

Please sign in to comment.