-
Notifications
You must be signed in to change notification settings - Fork 422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Netty server side cancellation #3256
Conversation
|
||
class NettyServerHandler[F[_]](route: Route[F], unsafeRunAsync: (() => F[Unit]) => Unit, maxContentLength: Option[Int])(implicit | ||
me: MonadError[F] | ||
class NettyServerHandler[F[_]](route: Route[F], unsafeRunAsync: (() => F[Unit]) => (() => Future[Unit]), maxContentLength: Option[Int])( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add a comment that unsafeRunAsync
returns a function to cancel the execution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment added, and signature extended, it's even more complex now ;)
.ensure(me.eval(req.release())) | ||
.ensure { | ||
me.eval { | ||
pendingResponses.dequeue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this make the assumption that the requests will end in the same order, in which they started?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are requests from the same channel, so maybe that's OK. Anyway, in http4s implementation there additional elements like eventLoopContext
and pendingResponses
. I think they are crucial to make sure this ordering works as expected, so I'll study them further.
@@ -67,7 +74,7 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options: | |||
config, | |||
new NettyServerHandler[RIO[R, *]]( | |||
route, | |||
(f: () => RIO[R, Unit]) => Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(f())), | |||
unsafeRunAsync(runtime), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can add a test for either ZIO/cats, that a long-running request is indeed cancelled (sth similar was present in the http4s PR)
if (ctx.channel.isActive) { | ||
initHandler(ctx) | ||
} | ||
override def channelActive(ctx: ChannelHandlerContext): Unit = initHandler(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it so that either channelActive
OR handlerAdded
is called? won't both be called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed sometimes both can be called, but now I've added a check in initHandler which will prevent double listener registration.
|
||
override def channelReadComplete(ctx: ChannelHandlerContext): Unit = { | ||
logger.trace(s"channelReadComplete: ctx = $ctx") | ||
// The normal response to read complete is to issue another read, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm is that needed for the pendingResponses
logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These ctx.read() calls are IMO unnecessary. For some reason http4s does AUTO_READ=false, but I don't see this interferring with our implementation. Reactive streams also work as expected.
- Might be needed for slow CI
endpoint | ||
.out(plainBody[String]) | ||
.serverLogic { _ => | ||
(m.eval(canceledSemaphore.acquire())) >> (async.sleep(15.seconds) >> pureResult("processing finished".asRight[Unit])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I don't understand the async.sleep
- we're waiting on the semaphore, and releasing it only after the cancel signal comes in - do we need to still sleep after that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semaphore will be acquired immediately here, the sleep is to simulate long processing. The semaphore is used to block the test code until backend's onCancel
finishes setting the boolean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I mixed up the release/aquire directions ;) yeah it's fine of course :)
case _: SttpClientException.TimeoutException => // expected, this is how we trigged client-side cancellation | ||
IO( | ||
assert( | ||
canceledSemaphore.tryAcquire(30L, TimeUnit.SECONDS), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get this either, the sempahore was released & acquired in the server/cancellation logic, why aquire it here again?>
me: MonadError[F] | ||
) extends SimpleChannelInboundHandler[HttpRequest] { | ||
|
||
// By using the Netty event loop assigned to this channel we get two benefits: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's maybe mention here that this is copied from http4's code, just to maintain proper attribution :)
// We keep track of the cancellation tokens for all the requests in flight. This gives us | ||
// observability into the number of requests in flight and the ability to cancel them all | ||
// if the connection gets closed. | ||
private[this] val pendingResponses = MutableQueue.empty[() => Future[Unit]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you manage to drill down and understand why this is a queue - can you have multiple ongoing requests? is http 1 only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly this is HTTP2, so netty can ingest a request, dispatch async processing to another thread pool, and pick up next request. The responses will be returned in order though, even if request 2 is finished before request 1. https://medium.com/@akhaku/netty-data-model-threading-and-gotchas-cab820e4815a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Edit: Turns out HTTP2 multiplexing is a different beast, which requires special setup of Netty, even an additional library (netty-codec-http2
). It's powerful, because it allows opening a single connection and sending multiple requests and getting responses in any order, the protocol should take care of this. However, it's something different than what our server is capable of.
With this code, we are implementing support for cancellation in HTTP 1.1 pipelining: where a client can send multiple requests without waiting for the response to the first, and the server will process and respond to them in order. One of the main challenges is that responses must be returned in order, which can introduce head-of-line blocking if processing one request takes longer than others. HTTP/2 addresses these issues by introducing mentioned multiplexing.
pendingResponses.dequeue() | ||
try { | ||
handleResponse(ctx, req, serverResponse) | ||
releaseReq() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we have to run this in case of exceptions as well? the .ensure
in the old version alway released it
import java.util.concurrent.{Semaphore, TimeUnit} | ||
import scala.concurrent.duration._ | ||
|
||
class ServerCancellationTests[F[_], OPTIONS, ROUTE](createServerTest: CreateServerTest[F, Any, OPTIONS, ROUTE])(implicit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice test :) I wonder which other backends support cancellation in this way ... but let's leave this for another task ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, this will be interesting to integrate into tapir-loom, but that's also another story
Fixes #2682
This PR changes the way effects are executed "into a Future", which is currently done by CE3 Dispatcher or ZIO Runtime. The change uses them differently, so that we get a cancellation callback, which can be later run by
NettyServerHandler
in order to attempt cancellation of long running requests when client disconnects.For raw Netty Future server, the cancellation callback is a noop, I guess there's no way to cancel standard Scala Futures without adding quite some complexity (see https://viktorklang.com/blog/Futures-in-Scala-protips-6.html).
Based on https://github.com/http4s/http4s-netty/pull/396/files