diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index f03b04a4c2..32d86f9962 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -627,16 +627,22 @@ object Pull extends PullLowPriority { def apply(r: Terminal[Y]): Pull[F, O, Unit] = b.cont(r) } + // This class is not created by the combinators in the public Pull API, only during compilation + private class DelegateBind[F[_], O, Y]( + step: Pull[F, O, Y], + override val delegate: Bind[F, O, Y, Unit] + ) extends Bind[F, O, Y, Unit](step) { + def cont(yr: Terminal[Y]): Pull[F, O, Unit] = delegate.cont(yr) + } + + // This class is not created by combinators in public Pull API, only during compilation private class BindBind[F[_], O, X, Y]( + step: Pull[F, O, X], bb: Bind[F, O, X, Y], - delegate: Bind[F, O, Y, Unit] - ) extends Bind[F, O, X, Unit](bb.step) { self => - + del: Bind[F, O, Y, Unit] + ) extends Bind[F, O, X, Unit](step) { def cont(zr: Terminal[X]): Pull[F, O, Unit] = - new Bind[F, O, Y, Unit](bb.cont(zr)) { - override val delegate: Bind[F, O, Y, Unit] = self.delegate - def cont(yr: Terminal[Y]): Pull[F, O, Unit] = delegate.cont(yr) - } + new DelegateBind(bb.cont(zr), del) } private def viewL[F[_], O](stream: Pull[F, O, Unit]): ViewL[F, O] = { @@ -647,7 +653,7 @@ object Pull extends PullLowPriority { case e: Action[F, O, Unit] => new EvalView[F, O](e) case b: Bind[F, O, y, Unit] => b.step match { - case c: Bind[F, O, x, _] => mk(new BindBind[F, O, x, y](c, b.delegate)) + case c: Bind[F, O, x, _] => mk(new BindBind[F, O, x, y](c.step, c.delegate, b.delegate)) case e: Action[F, O, y2] => new BindView(e, b) case r: Terminal[_] => mk(b.cont(r)) } @@ -724,11 +730,27 @@ object Pull extends PullLowPriority { // `InterruptedScope` contains id of the scope currently being interrupted // together with any errors accumulated during interruption process - private final case class CloseScope( - scopeId: Unique.Token, - interruption: Option[Interrupted], - exitCase: ExitCase - ) extends AlgEffect[Pure, Unit] + private abstract class CloseScope extends AlgEffect[Pure, Unit] { + def scopeId: Unique.Token + def interruption: Option[Interrupted] + def exitCase: ExitCase + } + + private final case class SucceedScope(scopeId: Unique.Token) extends CloseScope { + def exitCase: ExitCase = ExitCase.Succeeded + def interruption: Option[Interrupted] = None + } + + private final case class CanceledScope(scopeId: Unique.Token, inter: Interrupted) + extends CloseScope { + def exitCase: ExitCase = ExitCase.Canceled + def interruption: Option[Interrupted] = Some(inter) + } + + private final case class FailedScope(scopeId: Unique.Token, err: Throwable) extends CloseScope { + def exitCase: ExitCase = ExitCase.Errored(err) + def interruption: Option[Interrupted] = None + } private final case class GetScope[F[_]]() extends AlgEffect[Pure, Scope[F]] @@ -1050,9 +1072,9 @@ object Pull extends PullLowPriority { ): F[End] = { def endScope(scopeId: Unique.Token, result: Terminal[Unit]): Pull[G, X, Unit] = result match { - case Succeeded(_) => CloseScope(scopeId, None, ExitCase.Succeeded) - case inter @ Interrupted(_, _) => CloseScope(scopeId, Some(inter), ExitCase.Canceled) - case Fail(err) => CloseScope(scopeId, None, ExitCase.Errored(err)) + case Succeeded(_) => SucceedScope(scopeId) + case inter @ Interrupted(_, _) => CanceledScope(scopeId, inter) + case Fail(err) => FailedScope(scopeId, err) } val maybeCloseExtendedScope: F[Option[Scope[F]]] = @@ -1245,9 +1267,9 @@ object Pull extends PullLowPriority { case view: View[F, O, _] => view.step match { - case CloseScope(scopeId, _, _) => + case cs: CloseScope => // Inner scope is getting closed b/c a parent was interrupted - val cl: Pull[F, O, Unit] = CloseScope(scopeId, Some(interruption), ExitCase.Canceled) + val cl: Pull[F, O, Unit] = CanceledScope(cs.scopeId, interruption) transformWith(cl)(view) case _ => // all other cases insert interruption cause