Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support graceful shutdown in Netty server #3294

Merged
merged 16 commits into from
Nov 7, 2023
Prev Previous commit
Review fixes
kciesielski committed Nov 7, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit f2eacdcecbc096c245cca008f4df954a09bb6d8d
2 changes: 1 addition & 1 deletion doc/server/netty.md
Original file line number Diff line number Diff line change
@@ -65,7 +65,7 @@ NettyFutureServer(NettyConfig.defaultNoStreaming.socketBacklog(256))

## Graceful shutdown

A Netty server has to be properly closed using function `NettyFutureServerBinding.stop()` (and analogous functions available in Cats and ZIO bindings). This function ensures that the server will wait at most 10 seconds for in-flight requests to complete, while rejecting all new requests with 503 during this period. Afterwards, it closes all server resources.
A Netty should can be gracefully closed using function `NettyFutureServerBinding.stop()` (and analogous functions available in Cats and ZIO bindings). This function ensures that the server will wait at most 10 seconds for in-flight requests to complete, while rejecting all new requests with 503 during this period. Afterwards, it closes all server resources.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should can? ;) maybe just run it through chatgpt to iron out the english :)

You can customize this behavior in `NettyConfig`:

```scala mdoc:compile-only
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ class NettyCatsServerTest extends TestSuite with EitherValues {

val interpreter = new NettyCatsTestServerInterpreter(eventLoopGroup, dispatcher)
val createServerTest = new DefaultCreateServerTest(backend, interpreter)
implicit val ioSleeper: Sleeper[IO] = new Sleeper[IO] {
val ioSleeper: Sleeper[IO] = new Sleeper[IO] {
override def sleep(duration: FiniteDuration): IO[Unit] = IO.sleep(duration)
}

@@ -39,7 +39,7 @@ class NettyCatsServerTest extends TestSuite with EitherValues {
new ServerStreamingTests(createServerTest, Fs2Streams[IO]).tests() ++
new ServerCancellationTests(createServerTest)(m, IO.asyncForIO).tests() ++
new NettyFs2StreamingCancellationTest(createServerTest).tests() ++
new ServerGracefulShutdownTests(createServerTest).tests()
new ServerGracefulShutdownTests(createServerTest, ioSleeper).tests()

IO.pure((tests, eventLoopGroup))
} { case (_, eventLoopGroup) =>
Original file line number Diff line number Diff line change
@@ -9,8 +9,6 @@ import sttp.tapir.server.tests._
import sttp.tapir.tests.{Test, TestSuite}

import scala.concurrent.Future
import Sleeper._
import scala.concurrent.duration._

class NettyFutureServerTest extends TestSuite with EitherValues {
override def tests: Resource[IO, List[Test]] =
@@ -24,7 +22,7 @@ class NettyFutureServerTest extends TestSuite with EitherValues {
val createServerTest = new DefaultCreateServerTest(backend, interpreter)

val tests = new AllServerTests(createServerTest, interpreter, backend, multipart = false).tests() ++
new ServerGracefulShutdownTests(createServerTest).tests()
new ServerGracefulShutdownTests(createServerTest, Sleeper.futureSleeper).tests()

(tests, eventLoopGroup)
}) { case (_, eventLoopGroup) =>
Original file line number Diff line number Diff line change
@@ -25,15 +25,15 @@ class NettyZioServerTest extends TestSuite with EitherValues {

val interpreter = new NettyZioTestServerInterpreter(eventLoopGroup)
val createServerTest = new DefaultCreateServerTest(backend, interpreter)
implicit val zioSleeper: Sleeper[Task] = new Sleeper[Task] {
val zioSleeper: Sleeper[Task] = new Sleeper[Task] {
override def sleep(duration: FiniteDuration): Task[Unit] = ZIO.sleep(zio.Duration.fromScala(duration))
}

val tests =
new AllServerTests(createServerTest, interpreter, backend, staticContent = false, multipart = false).tests() ++
new ServerStreamingTests(createServerTest, ZioStreams).tests() ++
new ServerCancellationTests(createServerTest)(monadError, asyncInstance).tests() ++
new ServerGracefulShutdownTests(createServerTest).tests()
new ServerGracefulShutdownTests(createServerTest, zioSleeper).tests()

IO.pure((tests, eventLoopGroup))
} { case (_, eventLoopGroup) =>
Original file line number Diff line number Diff line change
@@ -47,6 +47,9 @@ trait CreateServerTest[F[_], +R, OPTIONS, ROUTE] {
runTest: (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion]
): Test

/** Override for a server to allow running tests which have access to a stop() effect, allowing shutting down the server within the test.
* By default, this method just uses a no-op IO.unit.
*/
def testServerWithStop(name: String, rs: => NonEmptyList[ROUTE], gracefulShutdownTimeout: Option[FiniteDuration])(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly here ... maybe we can reduce the number of variants here by just supporting the stop-variants, and the others would delegate here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these used in fact? :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok they are ... still seems we might have to many similar variants, making this hard to understand

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I didn't want to mess up other servers, so I thought to make the default implementation implement testServerWithStop in terms of testServer. I'll add some scaladoc to make it less confusing for now.

runTest: IO[Unit] => (SttpBackend[IO, Fs2Streams[IO] with WebSockets], Uri) => IO[Assertion]
): Test = testServer(name, rs)(runTest(IO.unit))
Original file line number Diff line number Diff line change
@@ -13,9 +13,8 @@ import sttp.tapir.tests._

import scala.concurrent.duration._

class ServerGracefulShutdownTests[F[_], OPTIONS, ROUTE](createServerTest: CreateServerTest[F, Any, OPTIONS, ROUTE])(implicit
m: MonadError[F],
sleeper: Sleeper[F]
class ServerGracefulShutdownTests[F[_], OPTIONS, ROUTE](createServerTest: CreateServerTest[F, Any, OPTIONS, ROUTE], sleeper: Sleeper[F])(implicit
m: MonadError[F]
) extends EitherValues {
import createServerTest._

Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ trait Sleeper[F[_]] {
}

object Sleeper {
implicit def futureSleeper(implicit ec: ExecutionContext): Sleeper[Future] = new Sleeper[Future] {
def futureSleeper(implicit ec: ExecutionContext): Sleeper[Future] = new Sleeper[Future] {
override def sleep(duration: FiniteDuration): Future[Unit] = Future {
blocking {
Thread.sleep(duration.toMillis)