Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP/2 rapid reset mitigation #344

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import pekko.http.scaladsl.settings.{ ClientConnectionSettings, ServerSettings }
import pekko.stream.TLSProtocol.{ SslTlsInbound, SslTlsOutbound }
import pekko.stream.scaladsl.{ BidiFlow, Flow, Keep, Sink, Source }
import pekko.util.ByteString
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._

import java.util.concurrent.{ CountDownLatch, TimeUnit }
Expand All @@ -40,6 +41,9 @@ class H2ClientServerBenchmark extends CommonBenchmark with H2RequestResponseBenc

val numRequests = 1000

@Param(Array("1s", "0s"))
var resetFrameThrottleInterval: String = _

@Benchmark
@OperationsPerInvocation(1000) // should be same as numRequest
def benchRequestProcessing(): Unit = {
Expand Down Expand Up @@ -69,7 +73,9 @@ class H2ClientServerBenchmark extends CommonBenchmark with H2RequestResponseBenc
def setup(): Unit = {
initRequestResponse()

system = ActorSystem("PekkoHttpBenchmarkSystem", config)
val resetFrameConfig = ConfigFactory.parseString(
s"pekko.http.server.http2.reset-frame.throttle-interval=$resetFrameThrottleInterval")
system = ActorSystem("PekkoHttpBenchmarkSystem", resetFrameConfig.withFallback(config))
val settings = implicitly[ServerSettings]
val log = system.log
implicit val ec = system.dispatcher
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# New configs added to support throttling HTTP/2 reset frames
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.scaladsl.settings.Http2ServerSettings.resetFrameThrottleCost")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.scaladsl.settings.Http2ServerSettings.resetFrameThrottleBurst")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.scaladsl.settings.Http2ServerSettings.resetFrameThrottleInterval")
9 changes: 9 additions & 0 deletions http-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,15 @@ pekko.http {
# Fail the connection if a sent ping is not acknowledged within this timeout.
# When zero the ping-interval is used, if set the value must be evenly divisible by less than or equal to the ping-interval.
ping-timeout = 0s

reset-frame {
# Configure the throttle for Reset Frames (https://github.com/apache/incubator-pekko-http/issues/332)
throttle-cost = 100
throttle-burst = 100
# setting pekko.http.server.http2.reset-frame.throttle-interval to 0s will disable the throttle
# try changing it to 1s if you want to apply a meaningful limit (the cost and burst apply to the interval time)
throttle-interval = 0s
}
}

websocket {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import pekko.http.scaladsl.settings.{
ParserSettings,
ServerSettings
}
import pekko.stream.{ BidiShape, Graph, StreamTcpException }
import pekko.stream.{ BidiShape, Graph, StreamTcpException, ThrottleMode }
import pekko.stream.TLSProtocol._
import pekko.stream.scaladsl.{ BidiFlow, Flow, Keep, Source }
import pekko.util.ByteString
Expand Down Expand Up @@ -122,12 +122,17 @@ private[http] object Http2Blueprint {
telemetry: TelemetrySpi,
dateHeaderRendering: DateHeaderRendering): BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, ServerTerminator] = {
val masterHttpHeaderParser = HttpHeaderParser(settings.parserSettings, log) // FIXME: reuse for framing
telemetry.serverConnection atop
val flow0 = telemetry.serverConnection atop
httpLayer(settings, log, dateHeaderRendering) atopKeepRight
serverDemux(settings.http2Settings, initialDemuxerSettings, upgraded) atop
FrameLogger.logFramesIfEnabled(settings.http2Settings.logFrames) atop // enable for debugging
hpackCoding(masterHttpHeaderParser, settings.parserSettings) atop
framing(log) atop
hpackCoding(masterHttpHeaderParser, settings.parserSettings)

val flow1 = if (settings.http2Settings.resetFrameThrottleInterval.toMillis > 0) {
flow0 atop rapidResetMitigation(settings.http2Settings) atopKeepLeft framing(log)
} else flow0 atop framing(log)

flow1 atop
errorHandling(log) atop
idleTimeoutIfConfigured(settings.idleTimeout)
}
Expand Down Expand Up @@ -198,6 +203,19 @@ private[http] object Http2Blueprint {
Flow[FrameEvent].map(FrameRenderer.render).prepend(Source.single(Http2Protocol.ClientConnectionPreface)),
Flow[ByteString].via(new Http2FrameParsing(shouldReadPreface = false, log)))

private def rapidResetMitigation(
settings: Http2ServerSettings): BidiFlow[FrameEvent, FrameEvent, FrameEvent, FrameEvent, NotUsed] = {
def frameCost(event: FrameEvent): Int = event match {
case _: FrameEvent.RstStreamFrame => 1
case _ => 0
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be contantFunc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what constantFunc means. Do you mean making this a val instead of a def?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, make just like the Keep.left/right

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 7156492 the sort of change that you want?


BidiFlow.fromFlows(
Flow[FrameEvent],
Flow[FrameEvent].throttle(settings.resetFrameThrottleCost, settings.resetFrameThrottleInterval,
settings.resetFrameThrottleBurst, frameCost, ThrottleMode.Enforcing))
}

/**
* Runs hpack encoding and decoding. Incoming frames that are processed are HEADERS and CONTINUATION.
* Outgoing frame is ParsedHeadersFrame.
Expand Down Expand Up @@ -290,5 +308,9 @@ private[http] object Http2Blueprint {
def atopKeepRight[OO1, II2, Mat2](
other: Graph[BidiShape[O1, OO1, II2, I2], Mat2]): BidiFlow[I1, OO1, II2, O2, Mat2] =
bidi.atopMat(other)(Keep.right)

def atopKeepLeft[OO1, II2, Mat2](
other: Graph[BidiShape[O1, OO1, II2, I2], Mat2]): BidiFlow[I1, OO1, II2, O2, Mat] =
bidi.atopMat(other)(Keep.left)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ trait Http2ServerSettings {

def getPingTimeout: Duration = Duration.ofMillis(pingTimeout.toMillis)
def withPingTimeout(timeout: Duration): Http2ServerSettings = withPingTimeout(timeout.toMillis.millis)

def getResetThrottleCost(): Int = resetFrameThrottleCost
def getResetThrottleBurst(): Int = resetFrameThrottleBurst

def getResetThrottleInterval: Duration = Duration.ofMillis(resetFrameThrottleInterval.toMillis)

def withResetThrottleInterval(interval: Duration): Http2ServerSettings =
withResetThrottleInterval(interval.toMillis.millis)
}
object Http2ServerSettings extends SettingsCompanion[Http2ServerSettings] {
def create(config: Config): Http2ServerSettings = scaladsl.settings.Http2ServerSettings(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ trait Http2ServerSettings extends javadsl.settings.Http2ServerSettings with Http
def pingTimeout: FiniteDuration
def withPingTimeout(timeout: FiniteDuration): Http2ServerSettings = copy(pingTimeout = timeout)

def resetFrameThrottleCost: Int
def withResetThrottleCost(cost: Int) = copy(resetFrameThrottleCost = cost)

def resetFrameThrottleBurst: Int
def withResetThrottleBurst(burst: Int) = copy(resetFrameThrottleBurst = burst)

def resetFrameThrottleInterval: FiniteDuration
def withResetThrottleInterval(interval: FiniteDuration) = copy(resetFrameThrottleInterval = interval)

@InternalApi
private[http] def internalSettings: Option[Http2InternalServerSettings]
@InternalApi
Expand All @@ -124,6 +133,9 @@ object Http2ServerSettings extends SettingsCompanion[Http2ServerSettings] {
logFrames: Boolean,
pingInterval: FiniteDuration,
pingTimeout: FiniteDuration,
resetFrameThrottleCost: Int,
resetFrameThrottleBurst: Int,
resetFrameThrottleInterval: FiniteDuration,
internalSettings: Option[Http2InternalServerSettings])
extends Http2ServerSettings {
require(maxConcurrentStreams >= 0, "max-concurrent-streams must be >= 0")
Expand Down Expand Up @@ -151,6 +163,9 @@ object Http2ServerSettings extends SettingsCompanion[Http2ServerSettings] {
logFrames = c.getBoolean("log-frames"),
pingInterval = c.getFiniteDuration("ping-interval"),
pingTimeout = c.getFiniteDuration("ping-timeout"),
resetFrameThrottleCost = c.getInt("reset-frame.throttle-cost"),
resetFrameThrottleBurst = c.getInt("reset-frame.throttle-burst"),
resetFrameThrottleInterval = c.getFiniteDuration("reset-frame.throttle-interval"),
None // no possibility to configure internal settings with config
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ trait WithLogCapturing extends SuiteMixin { this: TestSuite =>
*/
protected def failOnSevereMessages: Boolean = false

/**
* We expect a severe message but the message should contain this text. If there are any other severe messages,
* the test will fail.
*/
protected val expectSevereLogsOnlyToMatch: Option[String] = None

/**
* Can be overridden to adapt which events should be considered as severe if `failOnSevereMessages` is
* enabled.
Expand Down Expand Up @@ -86,6 +92,19 @@ trait WithLogCapturing extends SuiteMixin { this: TestSuite =>
Failed(new AssertionError(
s"No severe log messages should be emitted during test run but got [${stats(
Logging.WarningLevel)}] warnings and [${stats(Logging.ErrorLevel)}] errors (see marked lines above)"))
} else if (expectSevereLogsOnlyToMatch.nonEmpty) {
val severeEvents = events.filter(isSevere(_))
val matchingEvents = severeEvents.filter(_.message.toString.contains(expectSevereLogsOnlyToMatch.get))
if (severeEvents.isEmpty || matchingEvents != severeEvents) {
val stats = events.groupBy(_.level).mapValues(_.size).toMap.withDefaultValue(0)
flushLog()

Failed(new AssertionError(
s"Expected an error during test run but got unexpected results - got [${
stats(
Logging.WarningLevel)
}] warnings and [${stats(Logging.ErrorLevel)}] errors (see marked lines above)"))
} else res
} else res

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.http.impl.engine.http2

import org.apache.pekko
import pekko.http.impl.engine.http2.Http2Protocol.FrameType
import pekko.http.impl.engine.http2.framing.FrameRenderer
import pekko.util.ByteStringBuilder

import java.nio.ByteOrder

/**
* This tests the http2 server throttle support for rapid resets is disabled by default.
*/
class Http2ServerDisableResetThrottleSpec extends Http2SpecWithMaterializer("""
pekko.http.server.remote-address-header = on
pekko.http.server.http2.log-frames = on
""") {
override def failOnSevereMessages: Boolean = true

"The Http/2 server implementation" should {
"not cancel connection during rapid reset attack (throttle disabled)".inAssertAllStagesStopped(
new TestSetup with RequestResponseProbes {
implicit val bigEndian: ByteOrder = ByteOrder.BIG_ENDIAN
val bb = new ByteStringBuilder
bb.putInt(0)
val rstFrame = FrameRenderer.renderFrame(FrameType.RST_STREAM, ByteFlag.Zero, 1, bb.result())
val longFrame = Seq.fill(1000)(rstFrame).reduce(_ ++ _)
network.sendBytes(longFrame)
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.http.impl.engine.http2

import org.apache.pekko
import pekko.http.impl.engine.http2.Http2Protocol.FrameType
import pekko.http.impl.engine.http2.framing.FrameRenderer
import pekko.util.ByteStringBuilder

import java.nio.ByteOrder

/**
* This tests the http2 server throttle support for rapid resets.
*/
class Http2ServerEnableResetThrottleSpec extends Http2SpecWithMaterializer("""
pekko.http.server.remote-address-header = on
pekko.http.server.http2.log-frames = on
pekko.http.server.http2.reset-frame.throttle-interval = 1s
""") {
override val expectSevereLogsOnlyToMatch: Option[String] = Some(
"HTTP2 connection failed with error [Maximum throttle throughput exceeded.]. Sending INTERNAL_ERROR and closing connection.")

"The Http/2 server implementation" should {
"cancel connection during rapid reset attack".inAssertAllStagesStopped(new TestSetup with RequestResponseProbes {
implicit val bigEndian: ByteOrder = ByteOrder.BIG_ENDIAN
val bb = new ByteStringBuilder
bb.putInt(0)
val rstFrame = FrameRenderer.renderFrame(FrameType.RST_STREAM, ByteFlag.Zero, 1, bb.result())
val longFrame = Seq.fill(1000)(rstFrame).reduce(_ ++ _)
network.sendBytes(longFrame)
})
}
}
Loading
Loading