Skip to content

Commit

Permalink
=core Make use of statefulMap instead of statefulMapConcat.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Sep 1, 2023
1 parent 6781b5a commit 0405ca9
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2016-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko

import org.apache.pekko
import pekko.stream.Attributes
import pekko.stream.Outlet
import pekko.stream.SourceShape
import pekko.stream.stage.GraphStage
import pekko.stream.stage.GraphStageLogic
import pekko.stream.stage.OutHandler

/**
* Emits integers from 1 to the given `elementCount`. The `java.lang.Integer`
* objects are allocated in the constructor of the operator, so it should be created
* before the benchmark is started.
*/
class BenchTestSource(elementCount: Int) extends GraphStage[SourceShape[java.lang.Integer]] {

private val elements = new Array[java.lang.Integer](elementCount)
(1 to elementCount).foreach(n => elements(n - 1) = n)

val out: Outlet[java.lang.Integer] = Outlet("BenchTestSource")
override val shape: SourceShape[java.lang.Integer] = SourceShape(out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {

var n = 0

override def onPull(): Unit = {
n += 1
if (n > elementCount)
complete(out)
else
push(out, elements(n - 1))
}

setHandler(out, this)
}
}

class BenchTestSourceSameElement[T](elements: Int, elem: T) extends GraphStage[SourceShape[T]] {

val out: Outlet[T] = Outlet("BenchTestSourceSameElement")
override val shape: SourceShape[T] = SourceShape(out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {

var n = 0

override def onPull(): Unit = {
n += 1
if (n > elements)
complete(out)
else
push(out, elem)
}

setHandler(out, this)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.http.impl.engine

import com.typesafe.config.ConfigFactory

import org.apache.pekko
import pekko.{ BenchTestSourceSameElement, NotUsed }
import pekko.actor.ActorSystem
import pekko.http.impl.engine.ws.FrameEvent
import pekko.http.impl.engine.ws.Protocol.Opcode
import pekko.stream.scaladsl.{ Flow, Keep, Sink, Source }
import pekko.util.ByteString
import org.openjdk.jmh.annotations.{
Benchmark,
BenchmarkMode,
Mode,
OperationsPerInvocation,
OutputTimeUnit,
Scope,
State,
TearDown
}

import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.duration.{ Duration, DurationInt }

object MessageToFrameRendererBenchmark {
final val OperationsPerInvocation = 100000
}

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class MessageToFrameRendererBenchmark {
import MessageToFrameRendererBenchmark.OperationsPerInvocation

private val config = ConfigFactory.parseString("""
akka.actor.default-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
parallelism-factor = 1
}
}
""")

private implicit val system: ActorSystem = ActorSystem("MessageToFrameRendererBenchmark", config)

@TearDown
def shutdown(): Unit = {
Await.result(system.terminate(), 5.seconds)
}

def createSource(count: Int): Source[ByteString, NotUsed] =
Source.fromGraph(new BenchTestSourceSameElement(count, ByteString.empty))

private val newstreamedFrames = createSource(OperationsPerInvocation)
.statefulMap(() => true)((isFirst, data) => {
val frameOpcode = if (isFirst) Opcode.Pong else Opcode.Continuation
(false, FrameEvent.fullFrame(frameOpcode, None, data, fin = false))
}, _ => None)
.toMat(Sink.ignore)(Keep.right)

private val oldstreamedFrames = createSource(OperationsPerInvocation)
.via(statefulMap(() => {
var isFirst = true

{ data =>
val frameOpcode =
if (isFirst) {
isFirst = false
Opcode.Pong
} else Opcode.Continuation

FrameEvent.fullFrame(frameOpcode, None, data, fin = false)
}
}))
.toMat(Sink.ignore)(Keep.right)

def statefulMap[T, U](functionConstructor: () => T => U): Flow[T, U, NotUsed] =
Flow[T].statefulMapConcat { () =>
val f = functionConstructor()
i => f(i) :: Nil
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchOldStreamedFrames(): Unit =
Await.result(oldstreamedFrames.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchNewStreamedFrames(): Unit =
Await.result(newstreamedFrames.run(), Duration.Inf)

}
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,8 @@ private[http] object Http2Blueprint {
def httpLayerClient(masterHttpHeaderParser: HttpHeaderParser, settings: ClientConnectionSettings, log: LoggingAdapter)
: BidiFlow[HttpRequest, Http2SubStream, Http2SubStream, HttpResponse, NotUsed] =
BidiFlow.fromFlows(
Flow[HttpRequest].statefulMapConcat { () =>
val renderer = new RequestRendering(settings, log)
request => renderer(request) :: Nil
},
Flow[HttpRequest].statefulMap(() => new RequestRendering(settings, log))((renderer, request) =>
(renderer, renderer(request)), _ => None),
StreamUtils.statefulAttrsMap[Http2SubStream, HttpResponse] { attrs =>
val headerParser = masterHttpHeaderParser.createShallowCopy()
stream => ResponseParsing.parseResponse(headerParser, settings.parserSettings, attrs)(stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import pekko.util.ByteString
import pekko.stream.scaladsl.{ Flow, Source }
import Protocol.Opcode
import pekko.annotation.InternalApi
import pekko.http.impl.util.StreamUtils
import pekko.http.scaladsl.model.ws._

/**
Expand All @@ -35,20 +34,10 @@ private[http] object MessageToFrameRenderer {
Source.single(FrameEvent.fullFrame(opcode, None, data, fin = true))

def streamedFrames[M](opcode: Opcode, data: Source[ByteString, M]): Source[FrameStart, Any] =
data.via(StreamUtils.statefulMap(() => {
var isFirst = true

{ data =>
val frameOpcode =
if (isFirst) {
isFirst = false
opcode
} else Opcode.Continuation

FrameEvent.fullFrame(frameOpcode, None, data, fin = false)
}
})) ++
Source.single(FrameEvent.emptyLastContinuationFrame)
data.statefulMap(() => true)((isFirst, data) => {
val frameOpcode = if (isFirst) opcode else Opcode.Continuation
(false, FrameEvent.fullFrame(frameOpcode, None, data, fin = false))
}, _ => None) ++ Source.single(FrameEvent.emptyLastContinuationFrame)

Flow[Message]
.flatMapConcat {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,6 @@ private[http] object StreamUtils {
}
}

/**
* Similar idea than [[FlowOps.statefulMapConcat]] but for a simple map.
*/
def statefulMap[T, U](functionConstructor: () => T => U): Flow[T, U, NotUsed] =
Flow[T].statefulMapConcat { () =>
val f = functionConstructor()
i => f(i) :: Nil
}

/**
* Lifts the streams attributes into an element and passes them to the function for each passed through element.
* Similar idea than [[FlowOps.statefulMapConcat]] but for a simple map.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class RequestParsingSpec extends PekkoSpecWithMaterializer with Inside with Insp
.map(parseRequest)
.runWith(Sink.head)
.futureValue
catch { case ex => throw ex.getCause } // unpack futureValue exceptions
catch { case ex: Throwable => throw ex.getCause } // unpack futureValue exceptions
}

def shouldThrowMalformedRequest[T](block: => T): Exception = {
Expand Down

0 comments on commit 0405ca9

Please sign in to comment.