Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maybe force sequencing of finalizers, on cancelation #267

Closed
alexandru opened this issue Jun 7, 2018 · 15 comments
Closed

Maybe force sequencing of finalizers, on cancelation #267

alexandru opened this issue Jun 7, 2018 · 15 comments

Comments

@alexandru
Copy link
Member

alexandru commented Jun 7, 2018

val prg = IO.unit
    .guarantee(IO(println("release 1 - enter")) *> Timer[IO].sleep(500.millis) *> IO(println("release 1 - exit")))
    .guarantee(IO(println("release 2 - enter")) *> Timer[IO].sleep(500.millis) *> IO(println("release 2 - exit")))

prg.start(fiber => fiber.cancel *> fiber.join)

In case we leave this to run without cancellation, the output would be:

release 1 - enter
release 1 - exit

release 2 - enter
release 2 - exit

Cancellation on the other hand is concurrent and what happens right now is that the finalizers get triggered in order, but without sequencing forced, so in case of async finalizers (which we are forcing here by usage of sleep), we can get a non-deterministic result, like this:

release 1 - enter
release 2 - enter

release 1 - exit
release 2 - exit

Or this:

release 1 - enter
release 2 - enter

release 2 - exit
release 1 - exit

This can cause problems for example in IOApp. Due to our implementation, as a user reported, this doesn't work as one might expect:

object Uncancellable extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val Timeout = 2.milli 
    val promise = Promise[Unit] // will never finish
    val loooongIO = IO.fromFuture(IO(promise.future))

    for {
      _ <- IO(println("test uncancellable"))
      _ <- loooongIO.guaranteeCase {
        case Completed  => IO(println("Completed"))
        case Canceled   => IO(println("Canceled: Before")) *> IO.sleep(Timeout) *> IO(println("Canceled: After"))
        case Error(err) => IO(println(s"Error($err)"))
      }
    } yield {
      ExitCode.Success
    }
  }
}

So when the user interrupts this app (via SIGINT or SIGABORT) that finalizer doesn't get a chance to run, because there's no back-pressuring between it and the app's shutdown hook. And that's because finalizers aren't sequenced on cancellation.

The questions are:

  1. should finalizers be sequenced on cancellation?
  2. if not, how should we fix the IOApp sample above?
  • actually, should this sample be fixed or should it be considered normal behavior?

Why Aren't Finalizers Sequenced On Cancellation?

The builder for cancellable IOs, which sort of reflects the internal ADT, is this:

def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A]

However that IO[Unit] in that function is a replacement for () => Unit and isn't back-pressured on cancellation. The reason is two fold:

  1. cancellation happens in race conditions, once you cancel it means you no longer care about the result and freeing of resources soon enough is always a best effort anyway, therefore it's a "fire and forget" event
  • in general back-pressuring on the result of a cancellation event is a really bad idea in network protocols, people having bad experiences with TCP's protocol for closing connections
  1. cancellation is an event that's concurrent with the run-loop processing of that task, which means that whatever you do in your cancellation logic or finalizers has to be synchronized somehow; so if you end up with ...
bracket(acquire1) { _ => bracket(acquire2)(use2)(release2) } (release1)

If release1 depends on release2, then on cancellation you need some sort of synchronization anyway for the impure, side effectful parts being suspended, so it wouldn't be a stretch to suggest that such synchronization should also happen for sequencing the finalizers when needed.

Pro-Sequencing of Finalizers on Cancellation

The mental model users have is that of flatMap so it makes sense for finalizers to be sequenced on cancellation as well.

@rossabaker built IOApp and he wasn't aware that his IOApp will not work for this use-case. And on reviewing his code, I also did not see a problem. So this means the current behavior can lead to unexpected behavior even when used by experienced developers.

It would also be consistent with how try/finally behaves, even in the face of InterruptedException, although that's not a very good argument to be honest

Anti-Sequencing of Finalizers on Cancellation

As far as I'm aware, we'd have the only implementation that does this.

  1. Scalaz 8 does not do it
  2. ReactiveX does not do it
  3. I don't know how Haskell does it, but it's probably not doing it either

The issues that come up are these ...

Case 1 — the inner release blocks the execution of the outer release indefinitely and this can lead to unintended leakage, because in a race condition the logic won't wait and won't care if release1 is executed in order to proceed, this being the main problem with back-pressuring on an acknowledgement of interruption in network protocols, see arguments above:

bracket(acquire1) { _ => 
  bracket(acquire2)(use2)( IO.never )
} (release1)

Case 2 — the inner release triggers an error:

bracket(acquire1) { _ => 
  bracket(acquire2)(use2)( IO.raiseError(e) )
} (release)

This isn't problematic for our implementation, but it is problematic for other implementations. Specifically the way Scalaz 8 implemented errors finalizers are not allowed to throw errors and when they do, they are caught by the Fiber's error handling mechanism. If you sequence finalizers, that's going to be a problem, because such errors in Scalaz are blowing up the Fiber's run-loop AFAIK.

So what this means is that we cannot add this as a law in Concurrent, we cannot rely on it for polymorphic code.

The question is, what are we going to do about cats.effect.IO? Should we make it behave like this, or should we fix IOApp in some other way?


Just to make it clear, we can make IO behave like that on finalization, the question being, do we want to?

I'm conflicted on the right answer, I'm leaning towards making it safe, but safety in this case is in the eyes of the beholder 🙂

/cc @rossabaker @jdegoes @gvolpe @iravid — with the pro and anti arguments brought above, I would appreciate feedback

@alexandru alexandru changed the title Maybe force sequencing of finalizers, on cancelation, for cats.effect.IO Maybe force sequencing of finalizers, on cancelation Jun 7, 2018
@iravid
Copy link

iravid commented Jun 7, 2018

My use-case

This is just based on my intuition, would love to know if there's a better way of doing this.

Say I have a database connection:
def connection: Resource[F, Connection]

and an ETL process that depends on it (say, it reads from Kafka and writes messages to the database):
def etl(conn: Connection): Resource[F, Unit]
It has no return value, but I would like to install a finaliser on it, so I put it in a Resource.

So my program goes like this (untested, could be completely wrong :-)):

for {
  
  etlFiber <- (for {
                        conn <- connection
                         _ <- etl(conn)
                         _ <- IO.never
                       } yield ()).use(_ => IO.unit).start
  _ <- installShutdownHandler(etlFiber)
  _ <- etlFiber.join
} yield ()

installShutdownHandler would do something that'd trigger the fiber cancellation on an external signal.

Now, when cancelling the ETL fiber, I'd like the ETL to be closed before closing the connection.

Is there a better way to do this assuming finalisers are concurrent? I.e., to use external synchronization? I don't exactly understand how to do this since each Resource is unaware of other resources sequenced with it.

Regarding the sequencing cons

  • Treating resource cancellation as a best effort seems dubious to me; if the point is to guarantee proper finalisation, it should not be treated as a best effort thing.
  • None-terminating finalisation is a great point; I'd argue that it's the caller's responsibility to add timeouts on finalisers.

@alexandru
Copy link
Member Author

Treating resource cancellation as a best effort seems dubious to me; if the point is to guarantee proper finalisation, it should not be treated as a best effort thing.

This is because cancellation isn't finalization and expecting it to magically work as intended is leaky; it's a leaky abstraction:

  1. finalization is something that happens in sequence (e.g. this happens before that, thus establishing a happens-before relationship)
  2. cancellation is something that happens concurrently and may invoke finalization logic, but it's under very specific circumstances

Consider this sample:

def unsafeReadAll(in: BufferedReader): String = {
    val builder = new StringBuilder
    var continue = true
    do { 
      val line = in.readLine()
      if (line != null) builder.append(line)
      else continue = false
    } while (continue)
    builder.toString
}

IO(new BufferedReader(???)).bracket { in =>
  IO(unsafeReadAll(in))
} { in =>
  IO(in.close())
}

On cancellation this piece of code will trigger in.close() concurrently with in.readLine(), thus triggering an IOException inside use, whose treatment is actually undefined for polymorphic code. The exception itself is not the issue, but rather that unsynchronized cancellation like this can lead to data corruption. A condition that doesn't happen with normal termination.

But then what if we do this:

def readFile(file: File): IO[String] =
  IO.async { cb =>
    ec.execute(() => cb(Right(unsafeReadAll(???))))
  }

Is there any way to actually cancel this task? The answer is no. You can pretend that you can cancel it in a race condition by closing the "gate" and ignoring its result, but that loop will keep on going for as long as it wants to. Even if the result are discarded afterwards, its side effects will last forever.

Of course you can pull off magic™ and for example on top of the JVM you could do a Thread.interrupt if you can control that Thread, but then again:

  1. it's not something you can do after the fact, as a consumer of an IO[A], only as a producer of an IO[A], because IO is oblivious of threads and for good reasons
  2. there's no trick that you could pull inside a process that can be translated to over the network communications

I mean, consider what happens when a web server has to cancel a database query because the database server isn't responding, a situation that is precisely the reason for why you want cancellation being baked in a system — having to wait on acknowledgement that the database received and processed the cancellation request or even that the TCP connection was actually closed is nuts and has been the source of much pain and suffering.

I've had nights lost due to TIMED_WAIT and CLOSE_WAIT because I've had server crashes due to the server reaching the maximum number of opened file handles, due to trusting that clients are going through the normal handshake for closing connections.

@iravid
Copy link

iravid commented Jun 7, 2018

Completely agree with your points - the fact that cancellation is a thing in cats-effect should not lead users to believe that everything is magically cancellable. This is something that stumped me when Scalaz 8 was first announced: I didn't understand how a monolithic block of code like your loop could become magically cancellable.

But this could be addressed in docs, no? Something along the lines of IO values are only cancellable between binds.

This is orthogonal to the issue you raised though: should finalization in normal conditions and finalisation under cancellation behave the same? I think this is what we are discussing; not the cancellation logic itself.

If they don't behave the same, I find it hard to see how I can write proper finalisers in Resource. We'd need a Resource that's similar to the bracketCase.

@alexandru
Copy link
Member Author

To be honest I'm leaning towards sequencing finalizers on cancellation, however we are going against prior art and that's not a good situation to be in.

But yes, bracketCase makes a difference between Complete and Canceled because of the mentioned reasons, because Canceled can imply a different finalizer logic than Complete.

And this can be a problem for Resource indeed. I don't remember why we didn't introduce ExitCase in its release, but I remember we talked about it on that PR and we couldn't do it for some reason. Also FS2 doesn't have ExitCase in its internal representation either.

IO values are only cancellable between binds

Well that's not a good description either. You can supply cancellation logic that does the right thing, for example that loop can cooperate with the cancel signal:

def readAll(in: BufferedReader, ec: ExecutionContext): IO[String] = 
  IO.cancelable { cb => 
    // For cooperative synchronisation
    val isCanceled = new AtomicBoolean
    ec.execute { () =>
      val builder = new StringBuilder
      try {
        var continue = true
        do { 
          val line = in.readLine()
          if (line != null) builder.append(line)
          else continue = false
        } while (continue && isActive.get)
        
        if (isActive.getAndSet(false)) 
          cb(Right(builder.toString))
      }
      catch {
        case NonFatal(e) => cb(Left(e))
      }
      finally {
        in.close()
      }
    }
    IO(isCanceled.set(false))
  }

An interesting exercise would be to rewrite this in terms of bracket. It's certainly possible, but it's challenging, because now the acquire part has to ensure thread-safe communication between use and release.

I would go on a tangent here about how (what I named) the "continual" model (e.g. #262) is saner in this regard, but I'll refrain myself 🙂

@iravid
Copy link

iravid commented Jun 7, 2018

Right, there’s the cancellable builder too.

The main reason sequencing finalizers makes sense to me is consistency; I want the code to be run the same whether the bracketed IO program finishes successfully, errors out or is cancelled. This is the gist of my argument.

/steps off the bucket/

@jdegoes
Copy link

jdegoes commented Jun 7, 2018

In composing nested Scalaz 8 brackets, IO provides the following guarantees:

  1. Finalizations will execute linearly, not concurrently.
  2. Finalizations will execute in the reverse order of acquisitions.
  3. Defects in inner finalizations will not stop outer finalizations from running.
  4. The cause of all terminations is reported to user-defined supervisors.

All of these guarantees hold whether a fiber is terminated due to typed error, defect, or interruption. In other words, interruption has no effect on the linearization or bullet-proofing of finalizers.

I'm disturbed that cats-effect does not codify (1) - (3) in laws, as they are extremely important to building leak-free software. If the implementations do not provide these guarantees, they should be fixed as quickly as possible to conform with the Scalaz 8 model.

@alexandru
Copy link
Member Author

@jdegoes that's good to know, but I tested your implementation and it has the same behavior, as described by this issue.

Finalizations will execute linearly, not concurrently.

In case of interruption, that's not true. Is that a bug or was that intended?

@jdegoes
Copy link

jdegoes commented Jun 7, 2018

In case of interruption, that's not true. Is that a bug or was that intended?

They definitely execute linearly, not concurrently, but there could be a bug in the ordering. Do you have the code used to reproduce this?

@alexandru
Copy link
Member Author

alexandru commented Jun 7, 2018

Ah, you're right, so you are sequencing the execution of finalisers. Except that I found a bug, some sort of race condition.

Sample 1 — in which the inner finalizer never gets executed:

    val task = IO.unit[Void].bracket[Unit] { _ =>
      IO.sync[Void, Unit](println("start 1")) *> IO.sleep(1.second) *> IO.sync(println("release 1"))
    } { _ =>
      IO.unit[Void].bracket[Unit] { _ =>
        IO.sync[Void, Unit](println("start 2")) *> IO.sleep(1.second) *> IO.sync(println("release 2"))
      } { _ =>
        IO.never[Void, Unit]
      }
    }
    task.fork.flatMap(f => f.interrupt(new RuntimeException("cancel")) *> IO.never)

This fails with:

java.lang.Error: Defect: Fiber is not in executing or async state
	at scalaz.ioeffect.RTS$FiberContext.enterAsyncStart(RTS.scala:931)
	at scalaz.ioeffect.RTS$FiberContext.evaluate(RTS.scala:514)
	at scalaz.ioeffect.RTS$FiberContext.$anonfun$fork$1(RTS.scala:761)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
	at scalaz.ioeffect.RTS$$anon$1.run(RTS.scala:117)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
start 1
release 1

Sample 2 — the first specified finalizer gets executed twice somehow, which is really odd:

    IO.unit[Void]
      .bracket[Unit](_ => IO.sync[Void, Unit](println("start 1")) *> IO.sleep(1.second) *> IO.sync(println("release 1")))(_ => IO.unit[Void])
      .bracket[ExitStatus](_ => IO.sync[Void, Unit](println("start 2")) *> IO.sleep(1.second) *> IO.sync(println("release 2")))(_ => IO.sync(ExitStatus.ExitNow(0)))
      .fork.flatMap(f => f.interrupt(new RuntimeException("cancel")) *> IO.never)

Output:

start 1
start 1
java.lang.Error: Defect: Fiber is not in executing or async state
	at scalaz.ioeffect.RTS$FiberContext.enterAsyncStart(RTS.scala:931)
	at scalaz.ioeffect.RTS$FiberContext.evaluate(RTS.scala:514)
	at scalaz.ioeffect.RTS$FiberContext.$anonfun$fork$1(RTS.scala:761)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
	at scalaz.ioeffect.RTS$$anon$1.run(RTS.scala:117)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
release 1

And this behavior is what made me jump to conclusions.

@alexandru
Copy link
Member Author

@jdegoes btw, thanks for the clarifications — I'm now more confident that sequencing stacked finalizers on cancellation is probably the way to go.

@jdegoes
Copy link

jdegoes commented Jun 7, 2018

@alexandru Thanks for the bug report. I'll write this up in scalaz-ioz.

EDIT: TBI here.

@gvolpe
Copy link
Member

gvolpe commented Jun 8, 2018

My two cents:

  1. should finalizers be sequenced on cancellation?

I think this makes sense on nested brackets as it'll be consistent with Resource's implementation and it's what I as a user would expect.

  1. if not, how should we fix the IOApp sample above?

That's a tricky one. I'm not sure whether this is still handled (SIGINT, etc) in Haskell or not but I've seen some bugs report to GHC related to this. In such case the release part never gets to run.

Another question: Is the bracket implementation resilient to asynchronous exceptions? I mean, can the release part be interrupted by an async exception? I know that in Haskell this is managed by having an uninterruptibleMask in the bracket implementation but not completely sure how its managed in Cats Effect.

@alexandru
Copy link
Member Author

We don’t do async exceptions @gvolpe, that’s a Haskell-ism, but it refers to cancellation and yes we’ve got laws for it, the release being both uncancelable and resilient to errors.

@alexandru
Copy link
Member Author

If there are no further objections, I think it's settled — we should sequence finalizers on cancellation.

This change is of medium difficulty — we need to change our internal IOConnection to work with IO objects instead of () => Unit references and do a fold of those references on cancel.

@alexandru
Copy link
Member Author

We should probably add a law, not sure how we can express it, because we'd need to test that:

  1. the inner bracket was evaluated and the inner finalizer was installed
  2. the inner finalizer gets triggered only after the outer finalizer has terminated

We can probably do it through a combination of latches (aka Deferred), but it's gross and I'm not entirely sure that we want it as a law — although it does seem nice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants