-
Notifications
You must be signed in to change notification settings - Fork 603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change scoping behavior to open a scope on each bracket #1574
Change scoping behavior to open a scope on each bracket #1574
Conversation
@@ -43,37 +43,9 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing | |||
*/ | |||
def stream(implicit ev: R <:< Unit): Stream[F, O] = { | |||
val _ = ev | |||
Stream.fromFreeC(this.scope.get[F, O, Unit]) | |||
Stream.fromFreeC(this.asInstanceOf[Pull[F, O, Unit]].get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note converting a Pull
to a Stream
no longer inserts a scope -- this is an optional part of this PR but this always felt super arbitrary to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
*/ | ||
def acquire[F[_]: RaiseThrowable, R](r: F[R])(cleanup: R => F[Unit]): Pull[F, INothing, R] = | ||
acquireCancellable(r)(cleanup).map(_.resource) | ||
|
||
/** | ||
* Like [[acquire]] but the result value consists of a cancellation | ||
* pull and the acquired resource. Running the cancellation pull frees the resource. | ||
* This allows the acquired resource to be released earlier than at the end of the | ||
* containing pull scope. | ||
*/ | ||
def acquireCancellable[F[_]: RaiseThrowable, R](r: F[R])( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be deleted. I'll open a follow-up PR after this one is merged. IIRC, I added it when writing a custom pull that did log rotation -- the file handles were all opened in the same scope as a result of the custom pull. At the time, I couldn't find a way to close the old file handle when opening a new one. This was back in 0.9 with a significantly different pull API though.
@@ -1361,7 +1362,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, | |||
* }}} | |||
*/ | |||
def handleErrorWith[F2[x] >: F[x], O2 >: O](h: Throwable => Stream[F2, O2]): Stream[F2, O2] = | |||
Stream.fromFreeC(Algebra.scope(get[F2, O2]).handleErrorWith(e => h(e).get[F2, O2])) | |||
Stream.fromFreeC(get[F2, O2].handleErrorWith(e => h(e).get[F2, O2])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for a scope insertion here anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool. I remember this was done on the interruption, when we signalled it with the Throwable. If all interruption test go green this is ok.
* Note: see the disclaimer about the use of `streamNoScope`. | ||
*/ | ||
def scope: Stream[F, O] = Stream.fromFreeC(Algebra.scope(get)) | ||
private def scope(hard: Boolean): Stream[F, O] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now private as there's no need for manual scope control anymore.
@@ -3966,7 +3957,7 @@ object Stream extends StreamLowPriority { | |||
|
|||
resourceEval { | |||
F.delay(init()) | |||
.flatMap(i => Algebra.compile(s.get, scope, i)(foldChunk)) | |||
.flatMap(i => Algebra.compile(s.get, scope, true, i)(foldChunk)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using s.compile.resource
, we enable suppression of soft scopes (aka scopes that are a result of resource acquisition) for the root scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats an ultra-magic statement :-) I think we would need to clarify it for users.
@@ -84,7 +83,8 @@ object Balance { | |||
def through[F[_]: Concurrent, O, O2](chunkSize: Int)(pipes: Pipe[F, O, O2]*): Pipe[F, O, O2] = | |||
_.balance(chunkSize) | |||
.take(pipes.size) | |||
.zipWith(Stream.emits(pipes)) { case (stream, pipe) => stream.through(pipe) } | |||
.zipWithIndex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This had to change as it makes an assumption of scopes involved when zipping. With the more granular scopes used here, the leasing of the step legs was not effective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. Also we do have some tests that were covering interleaving scopes. I wonder what these tests will do
Looks very nice @mpilquist |
ecs.toList.foreach(_ shouldBe ExitCase.Canceled) | ||
Succeeded | ||
"interruption" in { | ||
pending // Completes with ExitCase.Completed instead of expected Canceled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is currently broken, but I think it's for the same reason as #1531 and recent reports in Gitter -- https://gitter.im/functional-streams-for-scala/fs2?at=5d644fb6022dba538e66cdfe.
I think when there's more granular scopes, the interruption applies to innermost scope but then the parents are terminated w/ Completed
. I spent an hour or so looking in to this but couldn't figure it out. I think the issue is in Algebra#scope0
, where the handler for interruption closes the interrupted scope but then terminates with a Result.Pure(())
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember these was really hard to solve and reason about and took really long time to figure them out. Some of the changes on scope was purposely driven by these tests. I hope they won't block us now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, though I don't think the status quo is much better, assuming it's the cause of the other reported issues with ExitCase.Completed
appearing where ExitCase.Canceled
is expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay fixed, it was a very simple issue: 04b893e
@@ -1614,6 +1615,7 @@ class StreamSpec extends Fs2Spec { | |||
case Some((hd, tl)) => Pull.eval(IO.never) | |||
} | |||
.stream | |||
.interruptScope |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note b/c pulls no longer introduce scopes, I had to add this manually here. I think this is the right tradeoff though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
@mpilquist Is there a particular reason that things need to be extended to the full resource scope? I guess def close[F[_]: Bracket[?[_], Throwable], A](r: Resource[F, A]): Resource[F, Unit] =
Resource.liftF(r.use(_ => unit[F])) Does that allow you to pinch things off sooner? |
@djspiewak It's admittedly fairly ad-hoc behavior but it's to enable programs like this: def resource =
Stream
.emit("resource - start")
.onFinalize(record("resource - done"))
.compile
.resource
.lastOrError
.use(x => f(x)) Here, we want |
@mpilquist Since we're in ad hoc land, can we just make the last scope extend to the end of the resource, but any prior scopes pinch off prior to that? |
@djspiewak That's a good idea and is consistent with the behavior Fabio wanted. I'll try to implement it... |
I'm a little confused about the api here:
This implies to me: val foo = Stream.bracket(r)(..) ++ s
foo.compile.drain // `r` is closed before `s` begins
foo.compile.resource.. // `r` is closed after `s` begins do I have that right? If so that feels pretty dangerous and non-composable to me :( |
@Daenyth Yeah, it's definitely strange at first glance. Take a look at ScalaDoc of compile.resource for motivation. |
What api would I use to guarantee that |
@Daenyth If Mike is able to implement my proposed semantics, then |
… scope is extended to resource lifecycle
I just pushed a commit that extends the scope of the last top-level scope (where top-level = direct descendant of root scope). Some examples: def r(tag: String) = Stream.bracket(IO(println("acquired " + tag)))(_ => IO(println("released " + tag)))
val a = r("1").compile.resource.lastOrError.use(_ => IO(println("using"))).unsafeRunSync
// acquired 1
// using
// released 1
val b = r("1").append(Stream(1)).compile.resource.lastOrError.use(_ => IO(println("using"))).unsafeRunSync
// acquired 1
// using
// released 1
val c = r("1").append(r("2")).compile.resource.lastOrError.use(_ => IO(println("using"))).unsafeRunSync
// acquired 1
// released 1
// acquired 2
// released 2
// acquired 3
// using
// released 3
val d = r("1").append(Stream.empty.onFinalize(IO(println("finalizer")))).compile.resource.lastOrError.use(_ => IO(println("using"))).unsafeRunSync
// acquired 1
// released 1
// using
// finalizer |
Still at Scala World, I'll have a few comments when I'm back but overall I'm pretty excited by how things are going :) |
One thing to be aware of -- in 1.0.{3,4,5}, def out(s: String): IO[Unit] = IO(println(s))
Stream(1)
.onFinalize(out("1"))
.onFinalize(out("2"))
.compile
.resource
.drain
.use(_ => out("using"))
.unsafeRunSync
// using
// 1
// 2 With this PR though: def out(s: String): IO[Unit] = IO(println(s))
Stream(1)
.onFinalize(out("1"))
.onFinalize(out("2"))
.compile
.resource
.drain
.use(_ => out("using"))
.unsafeRunSync
// 1
// using
// 2 The same problem exists for nested brackets -- e.g. Note: this isn't as bad as it looks as 1.0.{3,4,5} exhibits the same behavior if a scope is introduced between the finalizers, and in 1.0.{3,4,5}, scopes are introduced in many places implicitly. For example, the following insertion of Stream(1, 2, 3)
.onFinalize(out("1"))
.take(2)
.onFinalize(out("2"))
.compile
.resource
.drain
.use(_ => out("using"))
.unsafeRunSync
// 1
// using
// 2 |
What happens with a.concurrently(b).onFinalize(c).compile.resource |
r("1").concurrently(r("2").flatMap(_ => Stream.never[IO])).onFinalize(out("3")).compile.resource.lastOrError.use(_ => out("using")).unsafeRunSync
// acquired 1
// acquired 2
// released 1
// released 2
// using
// 3 |
@mpilquist that doesn't work then, right? |
Right, though it doesn't work today for many stream types. E.g., stick a |
I see -.-" Let's attack it from another angle maybe then. My use case for this has always been Maybe we can just make a |
On the other hand, our original goals were the behaviour of 1.0.2 + simplified interpreter (no Release) and we achieve that with this PR only |
The main use case I care about right now is The use case of |
FWIW, I use |
Yeah, me too
In library code though, exposing singleton Stream is confusing. |
|
I wonder if that last one can be fixed in |
@mpilquist your latest description conforms to my expectations.
So I would classify this PR as successful (thank you!). Apologies if I'm sounding extra picky, just a few points that maybe are worth exploring:
|
Slight correction: |
I've considered changing |
@mpilquist ah, you're right How about we do the opposite: I'd also like some clarification on what hard scopes are for, is it just interruption? |
I'll play with it some and see if there's something I can come up with. Thoughts on merging this as-is and following up with new PR for what comes out of that experiment? I'd like to get a milestone build out so http4s & other downstream projects can kick the tires a bit on the new scopes. |
Sorry, forgot to reply to your other 2 comments:
The strange part is
Not that I know of. I vaguely remember what you're referring to here but I think last time we tried this, we ran in to issues well before getting the full test suite passing. |
That's a pretty reasonable stance to take, but absent a spec for which way it should work, downstream libraries and applications will assume a kind of "reference implementation" semantic of what it does, and may not be unit testing that they get the specific behavior they want, instead assuming that the library semantics in the published version are the semantics and shouldn't be tested (in terms of "test your own code, assume your libraries work") |
Given that things now are more broken, I'm onboard with that (assuming Daniel and Pavel feel the same)
That's fair enough, yet on the other hand it did work in that code, but I don't think it's a blocker right now
I remember it differently, that we actually released that behaviour for a while and we found through an issue, I'd have to double check because I'm not sure |
I'm not sure if this is a known corner case or not, but considering that val r1 = Resource.make(IO(println("acquire 1")))(_ => IO(println("release 1")))
val r2 = Resource.make(IO(println("acquire 2")))(_ => IO(println("release 2"))) Stream.resource(r1)
.evalMap(_ => IO.unit)
.handleErrorWith(_ => Stream.emit(())) ++ Stream.resource(r2)
/* prints
acquire 1
release 1
acquire 2
release 2
*/ Stream.resource(r1)
.evalMap(_ => IO.raiseError(new RuntimeException))
.handleErrorWith(_ => Stream.emit(())) ++ Stream.resource(r2)
/* prints
acquire 1
acquire 2
release 2
release 1
*/ The obvious way to restore the expected behaviour (the former) is to add |
That really does seem odd. It's like the scope isn't getting closed correctly in the event that an error has propagated at any point? |
@djspiewak I think this change is the cause of that behaviour (I tried my example with the original version of |
Resolved #1535, alternative to #1562.
Scopes are now inserted fully automatically (no more
s.scope
) and are inserted around each call tobracket*
. The resulting behavior is much more intuitive -- e.g.bracket(r)(..) ++ s
now guarantees thatr
is released whens
is evaluated.One exception to the above is resources acquired in the root scope (or rather, resources acquired in a direct descendant of the root scope given that each call to
bracket*
creates a scope) and when usings.compile.resource
. Compiling to aResource[F, X]
extends the lifetime of such resources to the resource lifecycle. E.g.,(There are tests for this behavior in the current test suite).
To implement this exception case,
this PR introduces the internal notion of soft scopes and hard scopes. A soft scope is created for each bracket call whereas a hard scope is created in all other cases. When evaluatingThis PR was updated -- instead of soft/hard scopes, we now just delay closure of last top-level scope, where top-level is defined as a direct descendant of the root scope.s.compile.resource
, soft scopes opened off root are skipped.