diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index 62e62052fd..ba221ece11 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -43,37 +43,9 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing */ def stream(implicit ev: R <:< Unit): Stream[F, O] = { val _ = ev - Stream.fromFreeC(this.scope.get[F, O, Unit]) + Stream.fromFreeC(this.asInstanceOf[Pull[F, O, Unit]].get) } - // For binary compatibility with 1.0.x - private[Pull] def stream: Stream[F, O] = - Stream.fromFreeC(this.scope.get[F, O, Unit]) - - /** - * Like [[stream]] but no scope is inserted around the pull, resulting in any resources being - * promoted to the current scope of the stream, extending the resource lifetime. Typically used - * as a performance optimization, where resource lifetime can be extended in exchange for faster - * execution. - * - * Caution: this can result in resources with greatly extended lifecycles if the pull - * discards parts of the stream from which it was created. This could lead to memory leaks - * so be very careful when using this function. For example, a pull that emits the first element - * and discards the tail could discard the release of one or more resources that were acquired - * in order to produce the first element. Normally, these resources would be registered in the - * scope generated by the pull-to-stream conversion and hence released as part of that scope - * closing but when using `streamNoScope`, they get promoted to the current stream scope, - * which may be infinite in the worst case. - */ - def streamNoScope(implicit ev: R <:< Unit): Stream[F, O] = { - val _ = ev - Stream.fromFreeC(this.asInstanceOf[Pull[F, O, Unit]].get[F, O, Unit]) - } - - // For binary compatibility with 1.0.x - private[Pull] def streamNoScope: Stream[F, O] = - Stream.fromFreeC(this.asInstanceOf[Pull[F, O, Unit]].get[F, O, Unit]) - /** Applies the resource of this pull to `f` and returns the result. */ def flatMap[F2[x] >: F[x], O2 >: O, R2](f: R => Pull[F2, O2, R2]): Pull[F2, O2, R2] = Pull.fromFreeC(get[F2, O2, R].flatMap(r => f(r).get)) @@ -109,9 +81,6 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing h: Throwable => Pull[F2, O2, R2]): Pull[F2, O2, R2] = Pull.fromFreeC(get[F2, O2, R2].handleErrorWith(e => h(e).get)) - /** Tracks any resources acquired during this pull and releases them when the pull completes. */ - def scope: Pull[F, O, Unit] = Pull.fromFreeC(Algebra.scope(get[F, O, R].map(_ => ()))) - /** Discards the result type of this pull. */ def void: Pull[F, O, Unit] = as(()) } @@ -143,9 +112,7 @@ object Pull extends PullLowPriority { } /** - * Acquire a resource within a `Pull`. The cleanup action will be run at the end - * of the `.stream` scope which executes the returned `Pull`. The acquired - * resource is returned as the result value of the pull. + * Acquire a resource within a `Pull`. The acquired resource is returned as the result value of the pull. */ def acquire[F[_]: RaiseThrowable, R](r: F[R])(cleanup: R => F[Unit]): Pull[F, INothing, R] = acquireCancellable(r)(cleanup).map(_.resource) @@ -153,8 +120,6 @@ object Pull extends PullLowPriority { /** * Like [[acquire]] but the result value consists of a cancellation * pull and the acquired resource. Running the cancellation pull frees the resource. - * This allows the acquired resource to be released earlier than at the end of the - * containing pull scope. */ def acquireCancellable[F[_]: RaiseThrowable, R](r: F[R])( cleanup: R => F[Unit]): Pull[F, INothing, Cancellable[F, R]] = diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 09249f3030..65a6580611 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -137,21 +137,22 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, * Appends `s2` to the end of this stream. * * @example {{{ - * scala> ( Stream(1,2,3)++Stream(4,5,6) ).toList + * scala> (Stream(1,2,3) ++ Stream(4,5,6)).toList * res0: List[Int] = List(1, 2, 3, 4, 5, 6) * }}} * - * If `this` stream is not terminating, then the result is equivalent to `this`. + * If `this` stream is infinite, then the result is equivalent to `this`. */ - def ++[F2[x] >: F[x], O2 >: O](s2: => Stream[F2, O2]): Stream[F2, O2] = append(s2) - - /** Appends `s2` to the end of this stream. Alias for `s1 ++ s2`. */ - def append[F2[x] >: F[x], O2 >: O](s2: => Stream[F2, O2]): Stream[F2, O2] = + def ++[F2[x] >: F[x], O2 >: O](s2: => Stream[F2, O2]): Stream[F2, O2] = Stream.fromFreeC(get[F2, O2].transformWith { case Result.Pure(_) => s2.get case other => other.asFreeC[Algebra[F2, O2, ?]] }) + /** Appends `s2` to the end of this stream. Alias for `s1 ++ s2`. */ + def append[F2[x] >: F[x], O2 >: O](s2: => Stream[F2, O2]): Stream[F2, O2] = + this ++ s2 + /** * Alias for `_.map(_ => o2)`. * @@ -1361,7 +1362,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, * }}} */ def handleErrorWith[F2[x] >: F[x], O2 >: O](h: Throwable => Stream[F2, O2]): Stream[F2, O2] = - Stream.fromFreeC(Algebra.scope(get[F2, O2]).handleErrorWith(e => h(e).get[F2, O2])) + Stream.fromFreeC(get[F2, O2].handleErrorWith(e => h(e).get[F2, O2])) /** * Emits the first element of this stream (if non-empty) and then halts. @@ -1603,7 +1604,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, * }}} */ def map[O2](f: O => O2): Stream[F, O2] = - this.pull.echo.mapOutput(f).streamNoScope + this.pull.echo.mapOutput(f).stream /** * Maps a running total according to `S` and the input with the function `f`. @@ -1751,8 +1752,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, s.chunks .evalMap { chunk => guard.acquire >> - resultQ.enqueue1( - Some(Stream.chunk(chunk).onFinalize(guard.release).scope)) + resultQ.enqueue1(Some(Stream.chunk(chunk).onFinalize(guard.release))) } .interruptWhen(interrupt.get.attempt) .compile @@ -2044,8 +2044,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, .dropWhile(_ > 0) .take(1) .compile - .drain >> signalResult) - .scope >> + .drain >> signalResult) >> outputQ.dequeue .flatMap(Stream.chunk(_).covary[F2]) @@ -2329,17 +2328,8 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, def scanMonoid[O2 >: O](implicit O: Monoid[O2]): Stream[F, O2] = scan(O.empty)(O.combine) - /** - * Scopes are typically inserted automatically, at the boundary of a pull (i.e., when a pull - * is converted to a stream). This method allows a scope to be explicitly demarcated so that - * resources can be freed earlier than when using automatically inserted scopes. This is - * useful when using `streamNoScope` to convert from `Pull` to `Stream` -- i.e., by choosing - * to *not* have scopes inserted automatically, you may end up needing to demarcate scopes - * manually at a higher level in the stream structure. - * - * Note: see the disclaimer about the use of `streamNoScope`. - */ - def scope: Stream[F, O] = Stream.fromFreeC(Algebra.scope(get)) + private def scope: Stream[F, O] = + Stream.fromFreeC(Algebra.scope(get)) /** * Writes this stream to the supplied `PrintStream`, converting each element to a `String` via `Show`. @@ -2937,7 +2927,7 @@ object Stream extends StreamLowPriority { release: (R, ExitCase[Throwable]) => F[Unit]): Stream[F, R] = fromFreeC(Algebra.acquire[F, R, R](acquire, release).flatMap { case (r, token) => Stream.emit(r).covary[F].get[F, R] - }) + }).scope /** * Like [[bracket]] but the result value consists of a cancellation @@ -2959,13 +2949,14 @@ object Stream extends StreamLowPriority { */ def bracketCaseCancellable[F[x] >: Pure[x], R](acquire: F[R])( release: (R, ExitCase[Throwable]) => F[Unit]): Stream[F, (Stream[F, Unit], R)] = - bracketWithResource(acquire)(release).map { - case (res, r) => - (Stream.eval(res.release(ExitCase.Canceled)).flatMap { - case Left(t) => Stream.fromFreeC(Algebra.raiseError[F, Unit](t)) - case Right(u) => Stream.emit(u) - }, r) - } + bracketWithResource(acquire)(release) + .map { + case (res, r) => + (Stream.eval(res.release(ExitCase.Canceled)).flatMap { + case Left(t) => Stream.fromFreeC(Algebra.raiseError[F, Unit](t)) + case Right(u) => Stream.emit(u) + }, r) + } private[fs2] def bracketWithResource[F[x] >: Pure[x], R](acquire: F[R])( release: (R, ExitCase[Throwable]) => F[Unit]): Stream[F, (fs2.internal.Resource[F], R)] = @@ -2976,7 +2967,7 @@ object Stream extends StreamLowPriority { .covary[F] .map(o => (res, o)) .get[F, (fs2.internal.Resource[F], R)] - }) + }).scope /** * Creates a pure stream that emits the elements of the supplied chunk. @@ -3966,7 +3957,7 @@ object Stream extends StreamLowPriority { resourceEval { F.delay(init()) - .flatMap(i => Algebra.compile(s.get, scope, i)(foldChunk)) + .flatMap(i => Algebra.compile(s.get, scope, true, i)(foldChunk)) .map(finalize) } } @@ -3978,7 +3969,8 @@ object Stream extends StreamLowPriority { private def compile[F[_], O, B](stream: FreeC[Algebra[F, O, ?], Unit], init: B)( f: (B, Chunk[O]) => B)(implicit F: Sync[F]): F[B] = F.bracketCase(CompileScope.newRoot[F])(scope => - Algebra.compile[F, O, B](stream, scope, init)(f))((scope, ec) => scope.close(ec).rethrow) + Algebra.compile[F, O, B](stream, scope, false, init)(f))((scope, ec) => + scope.close(ec).rethrow) implicit def syncInstance[F[_]](implicit F: Sync[F]): Compiler[F, F] = new Compiler[F, F] { def apply[O, B, C](s: Stream[F, O], init: () => B)(foldChunk: (B, Chunk[O]) => B, diff --git a/core/shared/src/main/scala/fs2/concurrent/Balance.scala b/core/shared/src/main/scala/fs2/concurrent/Balance.scala index c298d0b0c8..4bf5cef1d7 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Balance.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Balance.scala @@ -50,7 +50,6 @@ object Balance { .getStream(chunkSize) .unNoneTerminate .flatMap(Stream.chunk) - def push = source.chunks .evalMap(chunk => pubSub.publish(Some(chunk))) @@ -84,7 +83,8 @@ object Balance { def through[F[_]: Concurrent, O, O2](chunkSize: Int)(pipes: Pipe[F, O, O2]*): Pipe[F, O, O2] = _.balance(chunkSize) .take(pipes.size) - .zipWith(Stream.emits(pipes)) { case (stream, pipe) => stream.through(pipe) } + .zipWithIndex + .map { case (stream, idx) => stream.through(pipes(idx.toInt)) } .parJoinUnbounded private def strategy[O]: PubSub.Strategy[Chunk[O], Chunk[O], Option[Chunk[O]], Int] = diff --git a/core/shared/src/main/scala/fs2/concurrent/Broadcast.scala b/core/shared/src/main/scala/fs2/concurrent/Broadcast.scala index b6d46a7504..fca0afea63 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Broadcast.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Broadcast.scala @@ -72,7 +72,8 @@ object Broadcast { def through[F[_]: Concurrent, O, O2](pipes: Pipe[F, O, O2]*): Pipe[F, O, O2] = _.through(apply(pipes.size)) .take(pipes.size) - .zipWith(Stream.emits(pipes)) { case (src, pipe) => src.through(pipe) } + .zipWithIndex + .map { case (src, idx) => src.through(pipes(idx.toInt)) } .parJoinUnbounded /** diff --git a/core/shared/src/main/scala/fs2/internal/Algebra.scala b/core/shared/src/main/scala/fs2/internal/Algebra.scala index 2ac9246412..9c693db133 100644 --- a/core/shared/src/main/scala/fs2/internal/Algebra.scala +++ b/core/shared/src/main/scala/fs2/internal/Algebra.scala @@ -132,7 +132,7 @@ private[fs2] object Algebra { case Result.Fail(err0) => raiseError(CompositeFailure(err, err0, Nil)) case Result.Interrupted(interruptedScopeId, _) => sys.error( - s"Impossible, cannot interrupt when closing failed scope: $scopeId, $interruptedScopeId $err") + s"Impossible, cannot interrupt when closing failed scope: $scopeId, $interruptedScopeId, $err") } case Result.Interrupted(ctx, err) => sys.error(s"Impossible context: $ctx") @@ -162,16 +162,19 @@ private[fs2] object Algebra { step(s, None).map { _.map { case (h, _, t) => (h, t) } } /** Left-folds the output of a stream. */ - def compile[F[_], O, B](stream: FreeC[Algebra[F, O, ?], Unit], scope: CompileScope[F], init: B)( - g: (B, Chunk[O]) => B)(implicit F: MonadError[F, Throwable]): F[B] = - compileLoop[F, O](scope, stream).flatMap { + def compile[F[_], O, B]( + stream: FreeC[Algebra[F, O, ?], Unit], + scope: CompileScope[F], + extendLastTopLevelScope: Boolean, + init: B)(g: (B, Chunk[O]) => B)(implicit F: MonadError[F, Throwable]): F[B] = + compileLoop[F, O](scope, extendLastTopLevelScope, stream).flatMap { case Some((output, scope, tail)) => try { val b = g(init, output) - compile(tail, scope, b)(g) + compile(tail, scope, extendLastTopLevelScope, b)(g) } catch { case NonFatal(err) => - compile(tail.asHandler(err), scope, init)(g) + compile(tail.asHandler(err), scope, extendLastTopLevelScope, init)(g) } case None => F.pure(init) @@ -198,11 +201,10 @@ private[fs2] object Algebra { * * Interpreter uses this to find any parents of this scope that has to be interrupted, and guards the * interruption so it won't propagate to scope that shall not be anymore interrupted. - * */ - private[this] def compileLoop[F[_], O]( scope: CompileScope[F], + extendLastTopLevelScope: Boolean, stream: FreeC[Algebra[F, O, ?], Unit] )(implicit F: MonadError[F, Throwable]) : F[Option[(Chunk[O], CompileScope[F], FreeC[Algebra[F, O, ?], Unit])]] = { @@ -210,6 +212,7 @@ private[fs2] object Algebra { def go[X, Res]( scope: CompileScope[F], compileCont: CompileCont[F, X, Res], + extendedTopLevelScope: Option[CompileScope[F]], stream: FreeC[Algebra[F, X, ?], Unit] ): F[Res] = stream.viewL match { @@ -227,7 +230,7 @@ private[fs2] object Algebra { case view: ViewL.View[Algebra[F, X, ?], y, Unit] => def resume(res: Result[y]): F[Res] = - go[X, Res](scope, compileCont, view.next(res)) + go[X, Res](scope, compileCont, extendedTopLevelScope, view.next(res)) def interruptGuard(scope: CompileScope[F])(next: => F[Res]): F[Res] = F.flatMap(scope.isInterrupted) { @@ -251,7 +254,7 @@ private[fs2] object Algebra { val handleStep = new CompileCont[F, y, Res] { def done(scope: CompileScope[F]): F[Res] = interruptGuard(scope)( - go(scope, compileCont, view.next(Result.pure(None))) + go(scope, compileCont, extendedTopLevelScope, view.next(Result.pure(None))) ) def out(head: Chunk[y], outScope: CompileScope[F], @@ -261,14 +264,15 @@ private[fs2] object Algebra { val nextScope = if (u.scope.isEmpty) outScope else scope val result = Result.pure(Some((head, outScope.id, tail))) interruptGuard(nextScope)( - go(nextScope, compileCont, view.next(result)) + go(nextScope, compileCont, extendedTopLevelScope, view.next(result)) ) } def interrupted(scopeId: Token, err: Option[Throwable]): F[Res] = resume(Result.interrupted(scopeId, err)) } - F.handleErrorWith(go[y, Res](stepScope, handleStep, u.stream)) { err => + F.handleErrorWith( + go[y, Res](stepScope, handleStep, extendedTopLevelScope, u.stream)) { err => resume(Result.raiseError(err)) } case None => @@ -297,11 +301,25 @@ private[fs2] object Algebra { case open: OpenScope[F] => interruptGuard(scope) { - F.flatMap(scope.open(open.interruptible)) { - case Left(err) => - go(scope, compileCont, view.next(Result.raiseError(err))) - case Right(childScope) => - go(childScope, compileCont, view.next(Result.pure(childScope.id))) + val maybeCloseExtendedScope: F[Boolean] = + // If we're opening a new top-level scope (aka, direct descendant of root), + // close the current extended top-level scope if it is defined. + if (scope.parent.isEmpty) + extendedTopLevelScope match { + case None => false.pure[F] + case Some(s) => s.close(ExitCase.Completed).rethrow.as(true) + } else F.pure(false) + maybeCloseExtendedScope.flatMap { closedExtendedScope => + val newExtendedScope = if (closedExtendedScope) None else extendedTopLevelScope + F.flatMap(scope.open(open.interruptible)) { + case Left(err) => + go(scope, compileCont, newExtendedScope, view.next(Result.raiseError(err))) + case Right(childScope) => + go(childScope, + compileCont, + newExtendedScope, + view.next(Result.pure(childScope.id))) + } } } @@ -325,27 +343,34 @@ private[fs2] object Algebra { } } - - go(ancestor, compileCont, view.next(res)) + go(ancestor, compileCont, extendedTopLevelScope, view.next(res)) } } - scope.findSelfOrAncestor(close.scopeId) match { - case Some(toClose) => closeAndGo(toClose, close.exitCase) + val scopeToClose = scope + .findSelfOrAncestor(close.scopeId) + .pure[F] + .orElse(scope.findSelfOrChild(close.scopeId)) + scopeToClose.flatMap { + case Some(toClose) => + if (toClose.parent.isEmpty) { + // Impossible - don't close root scope as a result of a `CloseScope` call + go(scope, compileCont, extendedTopLevelScope, view.next(Result.pure(()))) + } else if (extendLastTopLevelScope && toClose.parent.flatMap(_.parent).isEmpty) { + // Request to close the current top-level scope - if we're supposed to extend + // it instead, leave the scope open and pass it to the continuation + extendedTopLevelScope.traverse_(_.close(ExitCase.Completed).rethrow) *> + toClose.openAncestor.flatMap(ancestor => + go(ancestor, compileCont, Some(toClose), view.next(Result.pure(())))) + } else closeAndGo(toClose, close.exitCase) case None => - scope.findSelfOrChild(close.scopeId).flatMap { - case Some(toClose) => - closeAndGo(toClose, close.exitCase) - case None => - // scope already closed, continue with current scope - def result = - close.interruptedScope - .map { Result.interrupted _ tupled } - .getOrElse(Result.unit) - go(scope, compileCont, view.next(result)) - } + // scope already closed, continue with current scope + def result = + close.interruptedScope + .map { Result.interrupted _ tupled } + .getOrElse(Result.unit) + go(scope, compileCont, extendedTopLevelScope, view.next(result)) } - } } @@ -366,7 +391,7 @@ private[fs2] object Algebra { } } - go(scope, CompileEnd, stream) + go(scope, CompileEnd, None, stream) } /** @@ -401,7 +426,10 @@ private[fs2] object Algebra { view.step match { case close: CloseScope[F] => Algebra - .closeScope(close.scopeId, Some((interruptedScope, interruptedError)), close.exitCase) // assumes it is impossible so the `close` will be already from interrupted stream + .closeScope( + close.scopeId, + Some((interruptedScope, interruptedError)), + ExitCase.Canceled) // Inner scope is getting closed b/c a parent was interrupted .transformWith(view.next) case _ => // all other cases insert interruption cause diff --git a/core/shared/src/main/scala/fs2/internal/CompileScope.scala b/core/shared/src/main/scala/fs2/internal/CompileScope.scala index 15cf8dbc42..b50815e2ce 100644 --- a/core/shared/src/main/scala/fs2/internal/CompileScope.scala +++ b/core/shared/src/main/scala/fs2/internal/CompileScope.scala @@ -63,7 +63,7 @@ import fs2.internal.CompileScope.InterruptContext */ private[fs2] final class CompileScope[F[_]] private ( val id: Token, - private val parent: Option[CompileScope[F]], + val parent: Option[CompileScope[F]], val interruptible: Option[InterruptContext[F]] )(implicit val F: Sync[F]) extends Scope[F] { self => @@ -86,8 +86,7 @@ private[fs2] final class CompileScope[F[_]] private ( * If this scope is currently closed, then the child scope is opened on the first * open ancestor of this scope. * - * Returns scope that has to be used in next compilation step and the next stream - * to be evaluated. + * Returns scope that has to be used in next compilation step. */ def open( interruptible: Option[Concurrent[F]] @@ -385,7 +384,7 @@ private[fs2] final class CompileScope[F[_]] private ( } override def toString = - s"RunFoldScope(id=$id,interruptible=${interruptible.nonEmpty})" + s"CompileScope(id=$id,interruptible=${interruptible.nonEmpty})" } private[fs2] object CompileScope { diff --git a/core/shared/src/test/scala/fs2/StreamSpec.scala b/core/shared/src/test/scala/fs2/StreamSpec.scala index 995a69a8e0..9b27ceb4b4 100644 --- a/core/shared/src/test/scala/fs2/StreamSpec.scala +++ b/core/shared/src/test/scala/fs2/StreamSpec.scala @@ -87,11 +87,11 @@ class StreamSpec extends Fs2Spec { } } - "bracket.scope ++ bracket" - { + "bracket ++ bracket" - { def appendBracketTest[F[_]: Sync, A](use1: Stream[F, A], use2: Stream[F, A]): F[Unit] = for { events <- Ref.of[F, Vector[BracketEvent]](Vector.empty) - _ <- recordBracketEvents(events).scope + _ <- recordBracketEvents(events) .flatMap(_ => use1) .append(recordBracketEvents(events).flatMap(_ => use2)) .compile @@ -144,24 +144,24 @@ class StreamSpec extends Fs2Spec { "finalizer should not be called until necessary" in { IO.suspend { - val buffer = collection.mutable.ListBuffer[Symbol]() + val buffer = collection.mutable.ListBuffer[String]() Stream - .bracket(IO(buffer += 'Acquired)) { _ => - buffer += 'ReleaseInvoked - IO(buffer += 'Released).void + .bracket(IO(buffer += "Acquired")) { _ => + buffer += "ReleaseInvoked" + IO(buffer += "Released").void } .flatMap { _ => - buffer += 'Used + buffer += "Used" Stream.emit(()) } .flatMap { s => - buffer += 'FlatMapped + buffer += "FlatMapped" Stream(s) } .compile .toList .asserting { _ => - buffer.toList shouldBe List('Acquired, 'Used, 'FlatMapped, 'ReleaseInvoked, 'Released) + buffer.toList shouldBe List("Acquired", "Used", "FlatMapped", "ReleaseInvoked", "Released") } } } @@ -292,19 +292,21 @@ class StreamSpec extends Fs2Spec { } } - "interruption" in forAll { (s0: Stream[Pure, Int]) => - Counter[IO].flatMap { counter => - var ecs: Chain[ExitCase[Throwable]] = Chain.empty - val s = - Stream - .bracketCase(counter.increment) { (_, ec) => - counter.decrement >> IO { ecs = ecs :+ ec } - } - .flatMap(_ => s0 ++ Stream.never[IO]) - s.interruptAfter(50.millis).compile.drain.flatMap(_ => counter.get).asserting { count => - count shouldBe 0L - ecs.toList.foreach(_ shouldBe ExitCase.Canceled) - Succeeded + "interruption" in { + forAll { (s0: Stream[Pure, Int]) => + Counter[IO].flatMap { counter => + var ecs: Chain[ExitCase[Throwable]] = Chain.empty + val s = + Stream + .bracketCase(counter.increment) { (_, ec) => + counter.decrement >> IO { ecs = ecs :+ ec } + } + .flatMap(_ => s0 ++ Stream.never[IO]) + s.interruptAfter(50.millis).compile.drain.flatMap(_ => counter.get).asserting { count => + count shouldBe 0L + ecs.toList.foreach(_ shouldBe ExitCase.Canceled) + Succeeded + } } } } @@ -579,6 +581,37 @@ class StreamSpec extends Fs2Spec { .asserting(_ shouldBe expected) } + "last scope extended, not all scopes" - { + "1" in { + Ref[IO] + .of(List.empty[String]) + .flatMap { st => + def record(s: String): IO[Unit] = st.update(_ :+ s) + Stream.emit("start") + .onFinalize(record("first finalize")) + .onFinalize(record("second finalize")) + .compile + .resource + .lastOrError + .use(x => record(x)) *> st.get + }.asserting(_ shouldBe List("first finalize", "start", "second finalize")) + } + "2" in { + Ref[IO] + .of(List.empty[String]) + .flatMap { st => + def record(s: String): IO[Unit] = st.update(_ :+ s) + (Stream.bracket(IO("a"))(_ => record("first finalize")) ++ + Stream.bracket(IO("b"))(_ => record("second finalize")) ++ + Stream.bracket(IO("c"))(_ => record("third finalize"))) + .compile + .resource + .lastOrError + .use(x => record(x)) *> st.get + }.asserting(_ shouldBe List("first finalize", "second finalize", "c", "third finalize")) + } + } + "allocated" in { Ref[IO] .of(false) @@ -655,7 +688,6 @@ class StreamSpec extends Fs2Spec { val bg = Stream.repeatEval(IO(1) *> IO.sleep(50.millis)).onFinalize(semaphore.release) val fg = Stream.raiseError[IO](new Err).delayBy(25.millis) fg.concurrently(bg) - .scope .onFinalize(semaphore.acquire) } .compile @@ -671,7 +703,6 @@ class StreamSpec extends Fs2Spec { val bg = Stream.repeatEval(IO(1) *> IO.sleep(50.millis)).onFinalize(semaphore.release) val fg = s.delayBy[IO](25.millis) fg.concurrently(bg) - .scope .onFinalize(semaphore.acquire) } .compile @@ -1614,6 +1645,7 @@ class StreamSpec extends Fs2Spec { case Some((hd, tl)) => Pull.eval(IO.never) } .stream + .interruptScope .append(Stream(5)) .compile .toList @@ -2717,42 +2749,6 @@ class StreamSpec extends Fs2Spec { v.drop(1).scanLeft(h)(f)) } - "scope" - { - "1" in { - val c = new java.util.concurrent.atomic.AtomicLong(0) - val s1 = Stream.emit("a").covary[IO] - val s2 = Stream - .bracket(IO { c.incrementAndGet() shouldBe 1L; () }) { _ => - IO { c.decrementAndGet(); () } - } - .flatMap(_ => Stream.emit("b")) - (s1.scope ++ s2) - .take(2) - .scope - .repeat - .take(4) - .merge(Stream.eval_(IO.unit)) - .compile - .drain - .asserting(_ => c.get shouldBe 0L) - } - - "2" in { - Stream - .eval(Ref.of[IO, Int](0)) - .flatMap { ref => - Stream(1).flatMap { i => - Stream - .bracket(ref.update(_ + 1))(_ => ref.update(_ - 1)) - .flatMap(_ => Stream.eval(ref.get)) ++ Stream.eval(ref.get) - }.scope ++ Stream.eval(ref.get) - } - .compile - .toList - .asserting(_ shouldBe List(1, 1, 0)) - } - } - "sleep" in { val delay = 200.millis // force a sync up in duration, then measure how long sleep takes @@ -3225,13 +3221,6 @@ class StreamSpec extends Fs2Spec { as.zipAll(ones)("Z", "2").take(3).toList shouldBe List("A" -> "1", "A" -> "1", "A" -> "1") } - "zip with scopes" in { - // this tests that streams opening resources on each branch will close - // scopes independently. - val s = Stream(0).scope - (s ++ s).zip(s).toList shouldBe List((0, 0)) - } - "issue #1120 - zip with uncons" in { // this tests we can properly look up scopes for the zipped streams val rangeStream = Stream.emits((0 to 3).toList) @@ -3328,7 +3317,7 @@ class StreamSpec extends Fs2Spec { "#1107 - scope" in { Stream(0) .covary[IO] - .scope // Create a source that opens/closes a new scope for every element emitted + .onFinalize(IO.unit) .repeat .take(10000) .flatMap(_ => Stream.empty) // Never emit an element downstream diff --git a/io/src/main/scala/fs2/io/io.scala b/io/src/main/scala/fs2/io/io.scala index 213efba6f5..ea69202840 100644 --- a/io/src/main/scala/fs2/io/io.scala +++ b/io/src/main/scala/fs2/io/io.scala @@ -105,7 +105,7 @@ package object io { val os = if (closeAfterUse) Stream.bracket(fos)(os => blocker.delay(os.close())) else Stream.eval(fos) - os.flatMap(os => useOs(os).scope ++ Stream.eval(blocker.delay(os.flush()))) + os.flatMap(os => useOs(os) ++ Stream.eval(blocker.delay(os.flush()))) } // diff --git a/io/src/test/scala/fs2/io/tcp/SocketSpec.scala b/io/src/test/scala/fs2/io/tcp/SocketSpec.scala index dac982bc98..101c509ac2 100644 --- a/io/src/test/scala/fs2/io/tcp/SocketSpec.scala +++ b/io/src/test/scala/fs2/io/tcp/SocketSpec.scala @@ -37,7 +37,6 @@ class SocketSpec extends Fs2Spec { .reads(1024) .through(socket.writes()) .onFinalize(socket.endOfOutput) - .scope } } .parJoinUnbounded @@ -52,8 +51,7 @@ class SocketSpec extends Fs2Spec { .chunk(message) .through(socket.writes()) .drain - .onFinalize(socket.endOfOutput) - .scope ++ + .onFinalize(socket.endOfOutput) ++ socket.reads(1024, None).chunks.map(_.toArray) } } @@ -95,7 +93,6 @@ class SocketSpec extends Fs2Spec { .through(socket.writes()) .drain .onFinalize(socket.endOfOutput) - .scope }) } .parJoinUnbounded