From 3972d1f514eaf9638766eceb008a6a233f84939e Mon Sep 17 00:00:00 2001 From: "Diego E. Alonso-Blas" Date: Thu, 23 Jan 2020 00:29:49 +0100 Subject: [PATCH] A Stream can Pull anything: change Stream to `FreeC[F, O, IAny]` A `Stream[F, O]` is defined as a wrap on a `FreeC[F, O, Unit]`. The Unit type indicates, in types, that the "Result" type of the FreeC computation that would yield the stream is of no consequence: a Stream is a complete FreeC on its own, and its result is of no matter. We can also express that idea, in types, by marking that return type as an `Any`, the top type, which is another way to indicate that no useful information can come off the Result of the inner FreeC. One benefit is that we do not need spurious conversions on the FreeC object just to convert it back-and-forth a Stream: Any thing will do. To avoid problems with the Any poly-kindness, and much like with the INothing alias, we use a special IAny alias of Any for the types. --- core/shared/src/main/scala/fs2/Pull.scala | 7 +- core/shared/src/main/scala/fs2/Stream.scala | 42 ++++++------ .../main/scala/fs2/concurrent/Signal.scala | 2 +- core/shared/src/main/scala/fs2/fs2.scala | 6 ++ .../src/main/scala/fs2/internal/Algebra.scala | 64 +++++++++---------- .../src/test/scala/fs2/CompilationTest.scala | 4 +- .../src/test/scala/fs2/StreamSpec.scala | 2 +- io/src/main/scala/fs2/io/file/file.scala | 8 +-- .../reactivestreams/StreamSubscription.scala | 2 +- 9 files changed, 68 insertions(+), 69 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index 100427eb46..59f5605126 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -38,13 +38,10 @@ final class Pull[+F[_], +O, +R] private[fs2] (private val free: FreeC[F, O, R]) /** * Interpret this `Pull` to produce a `Stream`. * - * May only be called on pulls which return a `Unit` result type. Use `p.void.stream` to explicitly + * May only be called on pulls which return a `Unit` result type. Use `p.stream` to explicitly * ignore the result type of the pull. */ - def stream(implicit ev: R <:< Unit): Stream[F, O] = { - val _ = ev - new Stream(free.asInstanceOf[FreeC[F, O, Unit]]) - } + def stream: Stream[F, O] = new Stream(free) /** Applies the resource of this pull to `f` and returns the result. */ def flatMap[F2[x] >: F[x], O2 >: O, R2](f: R => Pull[F2, O2, R2]): Pull[F2, O2, R2] = diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 53db347b11..fda2db089a 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -127,7 +127,7 @@ import scala.concurrent.duration._ * @hideImplicitConversion PureOps * @hideImplicitConversion IdOps **/ -final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) extends AnyVal { +final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, IAny]) extends AnyVal { /** * Appends `s2` to the end of this stream. @@ -1176,7 +1176,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) f(hd(0)).free case _ => - def go(idx: Int): FreeC[F2, O2, Unit] = + def go(idx: Int): FreeC[F2, O2, IAny] = if (idx == hd.size) new Stream(tl).flatMap(f).free else { f(hd(idx)).free.transformWith { @@ -1651,7 +1651,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) * Creates a scope that may be interrupted by calling scope#interrupt. */ def interruptScope[F2[x] >: F[x]: Concurrent]: Stream[F2, O] = - new Stream(Algebra.interruptScope(free: FreeC[F2, O, Unit])) + new Stream(Algebra.interruptScope(free: FreeC[F2, O, IAny])) /** * Emits the specified separator between every pair of elements in the source stream. @@ -2490,7 +2490,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) def scanChunksOpt[S, O2 >: O, O3]( init: S )(f: S => Option[Chunk[O2] => (S, Chunk[O3])]): Stream[F, O3] = - this.pull.scanChunksOpt(init)(f).void.stream + this.pull.scanChunksOpt(init)(f).stream /** * Alias for `map(f).scanMonoid`. @@ -2657,7 +2657,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) * res0: List[Int] = List(0, 1, 2, 3, 4) * }}} */ - def take(n: Long): Stream[F, O] = this.pull.take(n).void.stream + def take(n: Long): Stream[F, O] = this.pull.take(n).stream /** * Emits the last `n` elements of the input. @@ -2684,7 +2684,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) * }}} */ def takeThrough(p: O => Boolean): Stream[F, O] = - this.pull.takeThrough(p).void.stream + this.pull.takeThrough(p).stream /** * Emits the longest prefix of the input for which all elements test true according to `f`. @@ -2695,7 +2695,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) * }}} */ def takeWhile(p: O => Boolean, takeFailure: Boolean = false): Stream[F, O] = - this.pull.takeWhile(p, takeFailure).void.stream + this.pull.takeWhile(p, takeFailure).stream /** * Transforms this stream using the given `Pipe`. @@ -2822,7 +2822,6 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) case None => k2(Right(that)) } - .void .stream } @@ -3659,7 +3658,6 @@ object Stream extends StreamLowPriority { val (o, sOpt) = f(s) Pull.output1(o) >> Pull.pure(sOpt) }(s) - .void .stream /** Like [[unfoldLoop]], but takes an effectful function. */ @@ -3670,7 +3668,6 @@ object Stream extends StreamLowPriority { case (o, sOpt) => Pull.output1(o) >> Pull.pure(sOpt) } )(s) - .void .stream /** Provides syntax for streams that are invariant in `F` and `O`. */ @@ -3679,7 +3676,7 @@ object Stream extends StreamLowPriority { /** Provides syntax for streams that are invariant in `F` and `O`. */ final class InvariantOps[F[_], O] private[Stream] ( - private val free: FreeC[F, O, Unit] + private val free: FreeC[F, O, IAny] ) extends AnyVal { private def self: Stream[F, O] = new Stream(free) @@ -3784,7 +3781,7 @@ object Stream extends StreamLowPriority { def repeatPull[O2]( using: Stream.ToPull[F, O] => Pull[F, O2, Option[Stream[F, O]]] ): Stream[F, O2] = - Pull.loop(using.andThen(_.map(_.map(_.pull))))(pull).void.stream + Pull.loop(using.andThen(_.map(_.map(_.pull))))(pull).stream } /** Provides syntax for pure streams. */ @@ -3792,7 +3789,7 @@ object Stream extends StreamLowPriority { new PureOps(s.free) /** Provides syntax for pure streams. */ - final class PureOps[O] private[Stream] (private val free: FreeC[Pure, O, Unit]) extends AnyVal { + final class PureOps[O] private[Stream] (private val free: FreeC[Pure, O, IAny]) extends AnyVal { private def self: Stream[Pure, O] = new Stream[Pure, O](free) /** Alias for covary, to be able to write `Stream.empty[X]`. */ @@ -3823,7 +3820,7 @@ object Stream extends StreamLowPriority { new PureTo(s.free) /** Provides `to` syntax for pure streams. */ - final class PureTo[O] private[Stream] (private val free: FreeC[Pure, O, Unit]) extends AnyVal { + final class PureTo[O] private[Stream] (private val free: FreeC[Pure, O, IAny]) extends AnyVal { private def self: Stream[Pure, O] = new Stream[Pure, O](free) /** Runs this pure stream and returns the emitted elements in a collection of the specified type. Note: this method is only available on pure streams. */ @@ -3836,7 +3833,7 @@ object Stream extends StreamLowPriority { new IdOps(s.free) /** Provides syntax for pure pipes based on `cats.Id`. */ - final class IdOps[O] private[Stream] (private val free: FreeC[Id, O, Unit]) extends AnyVal { + final class IdOps[O] private[Stream] (private val free: FreeC[Id, O, IAny]) extends AnyVal { private def idToApplicative[F[_]: Applicative]: Id ~> F = new (Id ~> F) { def apply[A](a: Id[A]) = a.pure[F] } @@ -3848,7 +3845,7 @@ object Stream extends StreamLowPriority { new FallibleOps(s.free) /** Provides syntax for fallible streams. */ - final class FallibleOps[O] private[Stream] (private val free: FreeC[Fallible, O, Unit]) + final class FallibleOps[O] private[Stream] (private val free: FreeC[Fallible, O, IAny]) extends AnyVal { private def self: Stream[Fallible, O] = new Stream(free) @@ -3880,7 +3877,7 @@ object Stream extends StreamLowPriority { new FallibleTo(s.free) /** Provides `to` syntax for fallible streams. */ - final class FallibleTo[O] private[Stream] (private val free: FreeC[Fallible, O, Unit]) + final class FallibleTo[O] private[Stream] (private val free: FreeC[Fallible, O, IAny]) extends AnyVal { private def self: Stream[Fallible, O] = new Stream(free) @@ -3891,7 +3888,7 @@ object Stream extends StreamLowPriority { /** Projection of a `Stream` providing various ways to get a `Pull` from the `Stream`. */ final class ToPull[F[_], O] private[Stream] ( - private val free: FreeC[F, O, Unit] + private val free: FreeC[F, O, IAny] ) extends AnyVal { private def self: Stream[F, O] = new Stream(free) @@ -4029,7 +4026,7 @@ object Stream extends StreamLowPriority { } /** Writes all inputs to the output of the returned `Pull`. */ - def echo: Pull[F, O, Unit] = new Pull(free) + def echo: Pull[F, O, IAny] = new Pull(free) /** Reads a single element from the input and emits it to the output. */ def echo1: Pull[F, O, Option[Stream[F, O]]] = @@ -4262,7 +4259,7 @@ object Stream extends StreamLowPriority { } object Compiler extends LowPrioCompiler { - private def compile[F[_], O, B](stream: FreeC[F, O, Unit], init: B)( + private def compile[F[_], O, B](stream: FreeC[F, O, IAny], init: B)( f: (B, Chunk[O]) => B )(implicit F: Sync[F]): F[B] = F.bracketCase(CompileScope.newRoot[F])(scope => @@ -4305,7 +4302,7 @@ object Stream extends StreamLowPriority { /** Projection of a `Stream` providing various ways to compile a `Stream[F,O]` to an `F[...]`. */ final class CompileOps[F[_], G[_], O] private[Stream] ( - private val free: FreeC[F, O, Unit] + private val free: FreeC[F, O, IAny] )(implicit compiler: Compiler[F, G]) { private def self: Stream[F, O] = new Stream(free) @@ -4616,7 +4613,7 @@ object Stream extends StreamLowPriority { final class StepLeg[F[_], O]( val head: Chunk[O], private[fs2] val scopeId: Token, - private[fs2] val next: FreeC[F, O, Unit] + private[fs2] val next: FreeC[F, O, IAny] ) { self => /** @@ -4631,7 +4628,6 @@ object Stream extends StreamLowPriority { .loop[F, O, StepLeg[F, O]] { leg => Pull.output(leg.head).flatMap(_ => leg.stepLeg) }(self.setHead(Chunk.empty)) - .void .stream /** Replaces head of this leg. Useful when the head was not fully consumed. */ diff --git a/core/shared/src/main/scala/fs2/concurrent/Signal.scala b/core/shared/src/main/scala/fs2/concurrent/Signal.scala index 329537f85f..f9b38ab511 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Signal.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Signal.scala @@ -74,7 +74,7 @@ object Signal extends SignalLowPriorityImplicits { Pull.output1[F, PullOutput]((x, y, restOfXs, restOfYs)): Pull[F, PullOutput, Unit] } } yield () - firstPull.value.void.stream + firstPull.value.stream .flatMap { case (x, y, restOfXs, restOfYs) => restOfXs.either(restOfYs).scan((x, y)) { diff --git a/core/shared/src/main/scala/fs2/fs2.scala b/core/shared/src/main/scala/fs2/fs2.scala index a744ea24da..61bbad01b4 100644 --- a/core/shared/src/main/scala/fs2/fs2.scala +++ b/core/shared/src/main/scala/fs2/fs2.scala @@ -34,4 +34,10 @@ package object fs2 { * Alias for `Nothing` which works better with type inference. */ type INothing <: Nothing + + /** + * Alias for `Any` which may work better with type inference. + */ + type IAny >: Any + } diff --git a/core/shared/src/main/scala/fs2/internal/Algebra.scala b/core/shared/src/main/scala/fs2/internal/Algebra.scala index 4e80e04235..2cc5d79f35 100644 --- a/core/shared/src/main/scala/fs2/internal/Algebra.scala +++ b/core/shared/src/main/scala/fs2/internal/Algebra.scala @@ -31,7 +31,7 @@ private[fs2] object Algebra { * @param stream Stream to step * @param scopeId If scope has to be changed before this step is evaluated, id of the scope must be supplied */ - final case class Step[X](stream: FreeC[Any, X, Unit], scope: Option[Token]) + final case class Step[X](stream: FreeC[Any, X, IAny], scope: Option[Token]) extends FreeC.Eval[PureK, INothing, Option[(Chunk[X], Token, FreeC[Any, X, Unit])]] { /* NOTE: The use of `Any` and `PureK` done to by-pass an error in Scala 2.12 type-checker, * that produces a crash when dealing with Higher-Kinded GADTs in which the F parameter appears @@ -71,7 +71,7 @@ private[fs2] object Algebra { def stepLeg[F[_], O](leg: Stream.StepLeg[F, O]): FreeC[F, Nothing, Option[Stream.StepLeg[F, O]]] = Step[O](leg.next, Some(leg.scopeId)).map { _.map { - case (h, id, t) => new Stream.StepLeg[F, O](h, id, t.asInstanceOf[FreeC[F, O, Unit]]) + case (h, id, t) => new Stream.StepLeg[F, O](h, id, t.asInstanceOf[FreeC[F, O, IAny]]) } } @@ -79,7 +79,7 @@ private[fs2] object Algebra { * Wraps supplied pull in new scope, that will be opened before this pull is evaluated * and closed once this pull either finishes its evaluation or when it fails. */ - def scope[F[_], O](s: FreeC[F, O, Unit]): FreeC[F, O, Unit] = + def scope[F[_], O](s: FreeC[F, O, IAny]): FreeC[F, O, IAny] = scope0(s, None) /** @@ -87,14 +87,14 @@ private[fs2] object Algebra { * Note that this may fail with `Interrupted` when interruption occurred */ private[fs2] def interruptScope[F[_], O]( - s: FreeC[F, O, Unit] - )(implicit F: Concurrent[F]): FreeC[F, O, Unit] = + s: FreeC[F, O, IAny] + )(implicit F: Concurrent[F]): FreeC[F, O, IAny] = scope0(s, Some(F)) private def scope0[F[_], O]( - s: FreeC[F, O, Unit], + s: FreeC[F, O, IAny], interruptible: Option[Concurrent[F]] - ): FreeC[F, O, Unit] = + ): FreeC[F, O, IAny] = OpenScope(interruptible).flatMap { scopeId => s.transformWith { case Result.Pure(_) => CloseScope(scopeId, interruptedScope = None, ExitCase.Completed) @@ -115,17 +115,17 @@ private[fs2] object Algebra { } def translate[F[_], G[_], O]( - s: FreeC[F, O, Unit], + s: FreeC[F, O, IAny], u: F ~> G - )(implicit G: TranslateInterrupt[G]): FreeC[G, O, Unit] = + )(implicit G: TranslateInterrupt[G]): FreeC[G, O, IAny] = translate0[F, G, O](u, s, G.concurrentInstance) - def uncons[F[_], X, O](s: FreeC[F, O, Unit]): FreeC[F, X, Option[(Chunk[O], FreeC[F, O, Unit])]] = - Step(s, None).map { _.map { case (h, _, t) => (h, t.asInstanceOf[FreeC[F, O, Unit]]) } } + def uncons[F[_], X, O](s: FreeC[F, O, IAny]): FreeC[F, X, Option[(Chunk[O], FreeC[F, O, IAny])]] = + Step(s, None).map { _.map { case (h, _, t) => (h, t.asInstanceOf[FreeC[F, O, IAny]]) } } /** Left-folds the output of a stream. */ def compile[F[_], O, B]( - stream: FreeC[F, O, Unit], + stream: FreeC[F, O, IAny], scope: CompileScope[F], extendLastTopLevelScope: Boolean, init: B @@ -161,22 +161,22 @@ private[fs2] object Algebra { private[this] def compileLoop[F[_], O]( scope: CompileScope[F], extendLastTopLevelScope: Boolean, - stream: FreeC[F, O, Unit] + stream: FreeC[F, O, IAny] )( implicit F: MonadError[F, Throwable] - ): F[Option[(Chunk[O], CompileScope[F], FreeC[F, O, Unit])]] = { + ): F[Option[(Chunk[O], CompileScope[F], FreeC[F, O, IAny])]] = { case class Done[X](scope: CompileScope[F]) extends R[X] - case class Out[X](head: Chunk[X], scope: CompileScope[F], tail: FreeC[F, X, Unit]) extends R[X] + case class Out[X](head: Chunk[X], scope: CompileScope[F], tail: FreeC[F, X, IAny]) extends R[X] case class Interrupted[X](scopeId: Token, err: Option[Throwable]) extends R[X] sealed trait R[X] def go[X]( scope: CompileScope[F], extendedTopLevelScope: Option[CompileScope[F]], - stream: FreeC[F, X, Unit] + stream: FreeC[F, X, IAny] ): F[R[X]] = stream.viewL match { - case _: FreeC.Result.Pure[Unit] => + case _: FreeC.Result.Pure[_] => F.pure(Done(scope)) case failed: FreeC.Result.Fail => @@ -188,7 +188,7 @@ private[fs2] object Algebra { case other => sys.error(s"Unexpected interruption context: $other (compileLoop)") } - case view: ViewL.View[F, X, y, Unit] => + case view: ViewL.View[F, X, y, _] => def resume(res: Result[y]): F[R[X]] = go[X](scope, extendedTopLevelScope, view.next(res)) @@ -206,13 +206,13 @@ private[fs2] object Algebra { F.pure(Out(output.values, scope, view.next(FreeC.Result.unit))) ) - case u: Step[y] => + case u: Step[_] => // if scope was specified in step, try to find it, otherwise use the current scope. F.flatMap(u.scope.fold[F[Option[CompileScope[F]]]](F.pure(Some(scope))) { scopeId => scope.findStepScope(scopeId) }) { case Some(stepScope) => - val stepStream = u.stream.asInstanceOf[FreeC[F, y, Unit]] + val stepStream = u.stream.asInstanceOf[FreeC[F, y, IAny]] F.flatMap(F.attempt(go[y](stepScope, extendedTopLevelScope, stepStream))) { case Right(Done(scope)) => interruptGuard(scope)( @@ -224,7 +224,7 @@ private[fs2] object Algebra { val nextScope = if (u.scope.isEmpty) outScope else scope val result = Result.Pure(Some((head, outScope.id, tail))) interruptGuard(nextScope)( - go(nextScope, extendedTopLevelScope, view.next(result)) + go(nextScope, extendedTopLevelScope, view.next(result.asInstanceOf[Result[y]])) ) case Right(Interrupted(scopeId, err)) => @@ -363,12 +363,12 @@ private[fs2] object Algebra { * @return */ def interruptBoundary[F[_], O]( - stream: FreeC[F, O, Unit], + stream: FreeC[F, O, IAny], interruptedScope: Token, interruptedError: Option[Throwable] - ): FreeC[F, O, Unit] = + ): FreeC[F, O, IAny] = stream.viewL match { - case _: FreeC.Result.Pure[Unit] => + case _: FreeC.Result.Pure[_] => Result.Interrupted(interruptedScope, interruptedError) case failed: FreeC.Result.Fail => Result.Fail( @@ -380,7 +380,7 @@ private[fs2] object Algebra { // impossible Result.Interrupted(interrupted.context, interrupted.deferredError) - case view: ViewL.View[F, O, _, Unit] => + case view: ViewL.View[F, O, _, IAny] => view.step match { case close: CloseScope => CloseScope( @@ -397,9 +397,9 @@ private[fs2] object Algebra { private def translate0[F[_], G[_], O]( fK: F ~> G, - stream: FreeC[F, O, Unit], + stream: FreeC[F, O, IAny], concurrent: Option[Concurrent[G]] - ): FreeC[G, O, Unit] = { + ): FreeC[G, O, IAny] = { def translateAlgEffect[R](self: AlgEffect[F, R]): AlgEffect[G, R] = self match { // safe to cast, used in translate only // if interruption has to be supported concurrent for G has to be passed @@ -411,11 +411,11 @@ private[fs2] object Algebra { case g: GetScope[_] => g } - def translateStep[X](next: FreeC[F, X, Unit], isMainLevel: Boolean): FreeC[G, X, Unit] = + def translateStep[X](next: FreeC[F, X, IAny], isMainLevel: Boolean): FreeC[G, X, IAny] = next.viewL match { - case result: Result[Unit] => result + case result: Result[_] => result - case view: ViewL.View[F, X, y, Unit] => + case view: ViewL.View[F, X, y, _] => view.step match { case output: Output[X] => output.transformWith { @@ -426,7 +426,7 @@ private[fs2] object Algebra { // Cast is safe here, as at this point the evaluation of this Step will end // and the remainder of the free will be passed as a result in Bind. As such // next Step will have this to evaluate, and will try to translate again. - view.next(r).asInstanceOf[FreeC[G, X, Unit]] + view.next(r).asInstanceOf[FreeC[G, X, IAny]] case r @ Result.Fail(_) => translateStep(view.next(r), isMainLevel) @@ -437,7 +437,7 @@ private[fs2] object Algebra { // NOTE: The use of the `asInstanceOf` is to by-pass a compiler-crash in Scala 2.12, // involving GADTs with a covariant Higher-Kinded parameter. Step[x]( - stream = translateStep[x](step.stream.asInstanceOf[FreeC[F, x, Unit]], false), + stream = translateStep[x](step.stream.asInstanceOf[FreeC[F, x, IAny]], false), scope = step.scope ).transformWith { r => translateStep[X](view.next(r.asInstanceOf[Result[y]]), isMainLevel) diff --git a/core/shared/src/test/scala/fs2/CompilationTest.scala b/core/shared/src/test/scala/fs2/CompilationTest.scala index 59b9793448..7d1bf20b0b 100644 --- a/core/shared/src/test/scala/fs2/CompilationTest.scala +++ b/core/shared/src/test/scala/fs2/CompilationTest.scala @@ -13,8 +13,8 @@ object ThisModuleShouldCompile { /* Some checks that `.pull` can be used without annotations */ Stream(1, 2, 3, 4).through(_.take(2)) Stream.eval(IO.pure(1)).through(_.take(2)) - Stream(1, 2, 3).covary[IO].pull.uncons1.void.stream - Stream.eval(IO.pure(1)).pull.uncons1.void.stream + Stream(1, 2, 3).covary[IO].pull.uncons1.stream + Stream.eval(IO.pure(1)).pull.uncons1.stream /* Also in a polymorphic context. */ def a[F[_], A](s: Stream[F, A]) = s.through(_.take(2)) diff --git a/core/shared/src/test/scala/fs2/StreamSpec.scala b/core/shared/src/test/scala/fs2/StreamSpec.scala index f63598009e..6ca0028ace 100644 --- a/core/shared/src/test/scala/fs2/StreamSpec.scala +++ b/core/shared/src/test/scala/fs2/StreamSpec.scala @@ -3443,7 +3443,7 @@ class StreamSpec extends Fs2Spec { } "7 - ok to translate step leg that is forced back in to a stream" in { - def goStep(step: Option[Stream.StepLeg[Function0, Int]]): Pull[Function0, Int, Unit] = + def goStep(step: Option[Stream.StepLeg[Function0, Int]]): Pull[Function0, Int, IAny] = step match { case None => Pull.done case Some(step) => diff --git a/io/src/main/scala/fs2/io/file/file.scala b/io/src/main/scala/fs2/io/file/file.scala index 3faf8fc5b5..5b9e457067 100644 --- a/io/src/main/scala/fs2/io/file/file.scala +++ b/io/src/main/scala/fs2/io/file/file.scala @@ -24,7 +24,7 @@ package object file { chunkSize: Int ): Stream[F, Byte] = Stream.resource(ReadCursor.fromPath(path, blocker)).flatMap { cursor => - cursor.readAll(chunkSize).void.stream + cursor.readAll(chunkSize).stream } /** @@ -40,7 +40,7 @@ package object file { end: Long ): Stream[F, Byte] = Stream.resource(ReadCursor.fromPath(path, blocker)).flatMap { cursor => - cursor.seek(start).readUntil(chunkSize, end).void.stream + cursor.seek(start).readUntil(chunkSize, end).stream } /** @@ -61,7 +61,7 @@ package object file { pollDelay: FiniteDuration = 1.second ): Stream[F, Byte] = Stream.resource(ReadCursor.fromPath(path, blocker)).flatMap { cursor => - cursor.seek(offset).tail(chunkSize, pollDelay).void.stream + cursor.seek(offset).tail(chunkSize, pollDelay).stream } /** @@ -77,7 +77,7 @@ package object file { in => Stream .resource(WriteCursor.fromPath(path, blocker, flags)) - .flatMap(_.writeAll(in).void.stream) + .flatMap(_.writeAll(in).stream) /** * Writes all data to a sequence of files, each limited in size to `limit`. diff --git a/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscription.scala b/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscription.scala index 4da15d2b3d..eaaaff4d7b 100644 --- a/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscription.scala +++ b/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscription.scala @@ -32,7 +32,7 @@ private[reactivestreams] final class StreamSubscription[F[_], A]( def unsafeStart(): Unit = { def subscriptionPipe: Pipe[F, A, A] = in => { - def go(s: Stream[F, A]): Pull[F, A, Unit] = + def go(s: Stream[F, A]): Pull[F, A, IAny] = Pull.eval(requests.dequeue1).flatMap { case Infinite => s.pull.echo case Finite(n) =>