Skip to content

Commit

Permalink
=str Add benchmark for MessageToFrameRenderer change.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Aug 31, 2023
1 parent 6ded069 commit 75064cd
Showing 1 changed file with 94 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package org.apache.pekko.http.impl.engine

import com.typesafe.config.ConfigFactory
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.impl.engine.ws.FrameEvent
import org.apache.pekko.http.impl.engine.ws.Protocol.Opcode
import org.apache.pekko.stream.scaladsl.{ Flow, Keep, Sink, Source }
import org.apache.pekko.util.ByteString
import org.openjdk.jmh.annotations.{
Benchmark,
BenchmarkMode,
Mode,
OperationsPerInvocation,
OutputTimeUnit,
Scope,
State,
TearDown
}

import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.duration.{ Duration, DurationInt }

object MessageToFrameRendererBenchmark {
final val OperationsPerInvocation = 100000
}

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class MessageToFrameRendererBenchmark {
import MessageToFrameRendererBenchmark.OperationsPerInvocation

private val config = ConfigFactory.parseString("""
akka.actor.default-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
parallelism-factor = 1
}
}
""")

private implicit val system: ActorSystem = ActorSystem("MessageToFrameRendererBenchmark", config)

@TearDown
def shutdown(): Unit = {
Await.result(system.terminate(), 5.seconds)
}

private val newstreamedFrames = Source.repeat(ByteString.empty)
.take(OperationsPerInvocation)
.statefulMap(() => true)((isFirst, data) => {
val frameOpcode = if (isFirst) Opcode.Pong else Opcode.Continuation
(false, FrameEvent.fullFrame(frameOpcode, None, data, fin = false))
}, _ => None)
.toMat(Sink.ignore)(Keep.right)

private val oldstreamedFrames =
Source
.repeat(ByteString.empty)
.take(OperationsPerInvocation)
.via(statefulMap(() => {
var isFirst = true

{ data =>
val frameOpcode =
if (isFirst) {
isFirst = false
Opcode.Pong
} else Opcode.Continuation

FrameEvent.fullFrame(frameOpcode, None, data, fin = false)
}
}))
.toMat(Sink.ignore)(Keep.right)

def statefulMap[T, U](functionConstructor: () => T => U): Flow[T, U, NotUsed] =
Flow[T].statefulMapConcat { () =>
val f = functionConstructor()
i => f(i) :: Nil
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchOldStreamedFrames(): Unit =
Await.result(oldstreamedFrames.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchNewStreamedFrames(): Unit =
Await.result(newstreamedFrames.run(), Duration.Inf)

}

0 comments on commit 75064cd

Please sign in to comment.