Skip to content

Commit

Permalink
Better handling of Empty entity in Akka an Pekko BodyListener. (#4140)
Browse files Browse the repository at this point in the history
Co-authored-by: Mateusz Zakarczemny <[email protected]>
  • Loading branch information
Matzz and Mateusz Zakarczemny authored Nov 8, 2024
1 parent 21d2aeb commit fd55044
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class AkkaBodyListener(implicit ec: ExecutionContext) extends BodyListener[Futur
override def onComplete(body: AkkaResponseBody)(cb: Try[Unit] => Future[Unit]): Future[AkkaResponseBody] = {
body match {
case ws @ Left(_) => cb(Success(())).map(_ => ws)
case Right(e @ HttpEntity.Empty) =>
case Right(e) if e.isKnownEmpty =>
Future.successful(Right(e)).andThen { case _ => cb(Success(())) }
case Right(e: UniversalEntity) =>
Future.successful(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ import sttp.capabilities.akka.AkkaStreams
import sttp.client3._
import sttp.client3.akkahttp.AkkaHttpBackend
import sttp.model.sse.ServerSentEvent
import sttp.model.Header
import sttp.model.MediaType
import sttp.monad.FutureMonad
import sttp.monad.syntax._
import sttp.tapir._
import sttp.tapir.server.interceptor._
import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor
import sttp.tapir.server.metrics.{EndpointMetric, Metric}
import sttp.tapir.server.tests._
import sttp.tapir.tests.{Test, TestSuite}

import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Future
import scala.util.Random

Expand Down Expand Up @@ -150,6 +154,58 @@ class AkkaHttpServerTest extends TestSuite with EitherValues {
r.header("Content-Length") shouldBe Some("0")
r.header("Transfer-Encoding") shouldBe None
}
},
Test("execute metrics interceptors for empty body and json content type") {
val e = endpoint.post.in(stringBody)
.out(stringBody)
.out(header(Header.contentType(MediaType.ApplicationJson)))
.serverLogicSuccess[Future](body => Future.successful(body))

class DummyMetric {
val onRequestCnt = new AtomicInteger(0)
val onEndpointRequestCnt = new AtomicInteger(0)
val onResponseHeadersCnt = new AtomicInteger(0)
val onResponseBodyCnt = new AtomicInteger(0)
}
val metric = new DummyMetric()
val customMetrics: Metric[Future, DummyMetric] =
Metric(
metric = metric,
onRequest = (_, metric, me) =>
me.eval {
metric.onRequestCnt.incrementAndGet()
EndpointMetric(
onEndpointRequest = Some((_) =>
me.eval(metric.onEndpointRequestCnt.incrementAndGet()),
),
onResponseHeaders = Some((_, _) =>
me.eval(metric.onResponseHeadersCnt.incrementAndGet()),
),
onResponseBody = Some((_, _) =>
me.eval(metric.onResponseBodyCnt.incrementAndGet()),
),
onException = None,
)
},
)
val route = AkkaHttpServerInterpreter(
AkkaHttpServerOptions.customiseInterceptors
.metricsInterceptor(new MetricsRequestInterceptor[Future](List(customMetrics), Seq.empty))
.options
).toRoute(e)

interpreter
.server(NonEmptyList.of(route))
.use { port =>
basicRequest.post(uri"http://localhost:$port").body("").send(backend).map { response =>
response.body shouldBe Right("")
metric.onRequestCnt.get() shouldBe 1
metric.onEndpointRequestCnt.get() shouldBe 1
metric.onResponseHeadersCnt.get() shouldBe 1
metric.onResponseBodyCnt.get() shouldBe 1
}
}
.unsafeToFuture()
}
)
def drainAkka(stream: AkkaStreams.BinaryStream): Future[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class PekkoBodyListener(implicit ec: ExecutionContext) extends BodyListener[Futu
override def onComplete(body: PekkoResponseBody)(cb: Try[Unit] => Future[Unit]): Future[PekkoResponseBody] = {
body match {
case ws @ Left(_) => cb(Success(())).map(_ => ws)
case Right(e @ HttpEntity.Empty) =>
case Right(e) if e.isKnownEmpty =>
Future.successful(Right(e)).andThen { case _ => cb(Success(())) }
case Right(e: UniversalEntity) =>
Future.successful(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@ import sttp.capabilities.pekko.PekkoStreams
import sttp.client3._
import sttp.client3.pekkohttp.PekkoHttpBackend
import sttp.model.sse.ServerSentEvent
import sttp.model.Header
import sttp.model.MediaType
import sttp.monad.FutureMonad
import sttp.monad.syntax._
import sttp.tapir._
import sttp.tapir.server.interceptor._
import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor
import sttp.tapir.server.metrics.{EndpointMetric, Metric}
import sttp.tapir.server.tests._
import sttp.tapir.tests.{Test, TestSuite}

import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Future
import scala.util.Random

Expand Down Expand Up @@ -98,6 +103,58 @@ class PekkoHttpServerTest extends TestSuite with EitherValues {
basicRequest.post(uri"http://localhost:$port").body("test123").send(backend).map(_.body shouldBe Right("replaced"))
}
.unsafeToFuture()
},
Test("execute metrics interceptors for empty body and json content type") {
val e = endpoint.post.in(stringBody)
.out(stringBody)
.out(header(Header.contentType(MediaType.ApplicationJson)))
.serverLogicSuccess[Future](body => Future.successful(body))

class DummyMetric {
val onRequestCnt = new AtomicInteger(0)
val onEndpointRequestCnt = new AtomicInteger(0)
val onResponseHeadersCnt = new AtomicInteger(0)
val onResponseBodyCnt = new AtomicInteger(0)
}
val metric = new DummyMetric()
val customMetrics: Metric[Future, DummyMetric] =
Metric(
metric = metric,
onRequest = (_, metric, me) =>
me.eval {
metric.onRequestCnt.incrementAndGet()
EndpointMetric(
onEndpointRequest = Some((_) =>
me.eval(metric.onEndpointRequestCnt.incrementAndGet()),
),
onResponseHeaders = Some((_, _) =>
me.eval(metric.onResponseHeadersCnt.incrementAndGet()),
),
onResponseBody = Some((_, _) =>
me.eval(metric.onResponseBodyCnt.incrementAndGet()),
),
onException = None,
)
},
)
val route = PekkoHttpServerInterpreter(
PekkoHttpServerOptions.customiseInterceptors
.metricsInterceptor(new MetricsRequestInterceptor[Future](List(customMetrics), Seq.empty))
.options
).toRoute(e)

interpreter
.server(NonEmptyList.of(route))
.use { port =>
basicRequest.post(uri"http://localhost:$port").body("").send(backend).map { response =>
response.body shouldBe Right("")
metric.onRequestCnt.get() shouldBe 1
metric.onEndpointRequestCnt.get() shouldBe 1
metric.onResponseHeadersCnt.get() shouldBe 1
metric.onResponseBodyCnt.get() shouldBe 1
}
}
.unsafeToFuture()
}
)
def drainPekko(stream: PekkoStreams.BinaryStream): Future[Unit] =
Expand Down

0 comments on commit fd55044

Please sign in to comment.