Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A Stream can pull anything. #1757

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,10 @@ final class Pull[+F[_], +O, +R] private[fs2] (private val free: FreeC[F, O, R])
/**
* Interpret this `Pull` to produce a `Stream`.
*
* May only be called on pulls which return a `Unit` result type. Use `p.void.stream` to explicitly
* May only be called on pulls which return a `Unit` result type. Use `p.stream` to explicitly
* ignore the result type of the pull.
*/
def stream(implicit ev: R <:< Unit): Stream[F, O] = {
val _ = ev
new Stream(free.asInstanceOf[FreeC[F, O, Unit]])
}
def stream: Stream[F, O] = new Stream(free)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm generally 👎 on removing the R <:< Unit requirement here as requiring that evidence has exposed bugs where the remainder was not dealt with correctly.


/** Applies the resource of this pull to `f` and returns the result. */
def flatMap[F2[x] >: F[x], O2 >: O, R2](f: R => Pull[F2, O2, R2]): Pull[F2, O2, R2] =
Expand Down
42 changes: 19 additions & 23 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ import scala.concurrent.duration._
* @hideImplicitConversion PureOps
* @hideImplicitConversion IdOps
**/
final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) extends AnyVal {
final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, IAny]) extends AnyVal {

/**
* Appends `s2` to the end of this stream.
Expand Down Expand Up @@ -1176,7 +1176,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
f(hd(0)).free

case _ =>
def go(idx: Int): FreeC[F2, O2, Unit] =
def go(idx: Int): FreeC[F2, O2, IAny] =
if (idx == hd.size) new Stream(tl).flatMap(f).free
else {
f(hd(idx)).free.transformWith {
Expand Down Expand Up @@ -1651,7 +1651,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
* Creates a scope that may be interrupted by calling scope#interrupt.
*/
def interruptScope[F2[x] >: F[x]: Concurrent]: Stream[F2, O] =
new Stream(Algebra.interruptScope(free: FreeC[F2, O, Unit]))
new Stream(Algebra.interruptScope(free: FreeC[F2, O, IAny]))

/**
* Emits the specified separator between every pair of elements in the source stream.
Expand Down Expand Up @@ -2490,7 +2490,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
def scanChunksOpt[S, O2 >: O, O3](
init: S
)(f: S => Option[Chunk[O2] => (S, Chunk[O3])]): Stream[F, O3] =
this.pull.scanChunksOpt(init)(f).void.stream
this.pull.scanChunksOpt(init)(f).stream

/**
* Alias for `map(f).scanMonoid`.
Expand Down Expand Up @@ -2657,7 +2657,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
* res0: List[Int] = List(0, 1, 2, 3, 4)
* }}}
*/
def take(n: Long): Stream[F, O] = this.pull.take(n).void.stream
def take(n: Long): Stream[F, O] = this.pull.take(n).stream

/**
* Emits the last `n` elements of the input.
Expand All @@ -2684,7 +2684,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
* }}}
*/
def takeThrough(p: O => Boolean): Stream[F, O] =
this.pull.takeThrough(p).void.stream
this.pull.takeThrough(p).stream

/**
* Emits the longest prefix of the input for which all elements test true according to `f`.
Expand All @@ -2695,7 +2695,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
* }}}
*/
def takeWhile(p: O => Boolean, takeFailure: Boolean = false): Stream[F, O] =
this.pull.takeWhile(p, takeFailure).void.stream
this.pull.takeWhile(p, takeFailure).stream

/**
* Transforms this stream using the given `Pipe`.
Expand Down Expand Up @@ -2822,7 +2822,6 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])

case None => k2(Right(that))
}
.void
.stream
}

Expand Down Expand Up @@ -3659,7 +3658,6 @@ object Stream extends StreamLowPriority {
val (o, sOpt) = f(s)
Pull.output1(o) >> Pull.pure(sOpt)
}(s)
.void
.stream

/** Like [[unfoldLoop]], but takes an effectful function. */
Expand All @@ -3670,7 +3668,6 @@ object Stream extends StreamLowPriority {
case (o, sOpt) => Pull.output1(o) >> Pull.pure(sOpt)
}
)(s)
.void
.stream

/** Provides syntax for streams that are invariant in `F` and `O`. */
Expand All @@ -3679,7 +3676,7 @@ object Stream extends StreamLowPriority {

/** Provides syntax for streams that are invariant in `F` and `O`. */
final class InvariantOps[F[_], O] private[Stream] (
private val free: FreeC[F, O, Unit]
private val free: FreeC[F, O, IAny]
) extends AnyVal {
private def self: Stream[F, O] = new Stream(free)

Expand Down Expand Up @@ -3784,15 +3781,15 @@ object Stream extends StreamLowPriority {
def repeatPull[O2](
using: Stream.ToPull[F, O] => Pull[F, O2, Option[Stream[F, O]]]
): Stream[F, O2] =
Pull.loop(using.andThen(_.map(_.map(_.pull))))(pull).void.stream
Pull.loop(using.andThen(_.map(_.map(_.pull))))(pull).stream
}

/** Provides syntax for pure streams. */
implicit def PureOps[O](s: Stream[Pure, O]): PureOps[O] =
new PureOps(s.free)

/** Provides syntax for pure streams. */
final class PureOps[O] private[Stream] (private val free: FreeC[Pure, O, Unit]) extends AnyVal {
final class PureOps[O] private[Stream] (private val free: FreeC[Pure, O, IAny]) extends AnyVal {
private def self: Stream[Pure, O] = new Stream[Pure, O](free)

/** Alias for covary, to be able to write `Stream.empty[X]`. */
Expand Down Expand Up @@ -3823,7 +3820,7 @@ object Stream extends StreamLowPriority {
new PureTo(s.free)

/** Provides `to` syntax for pure streams. */
final class PureTo[O] private[Stream] (private val free: FreeC[Pure, O, Unit]) extends AnyVal {
final class PureTo[O] private[Stream] (private val free: FreeC[Pure, O, IAny]) extends AnyVal {
private def self: Stream[Pure, O] = new Stream[Pure, O](free)

/** Runs this pure stream and returns the emitted elements in a collection of the specified type. Note: this method is only available on pure streams. */
Expand All @@ -3836,7 +3833,7 @@ object Stream extends StreamLowPriority {
new IdOps(s.free)

/** Provides syntax for pure pipes based on `cats.Id`. */
final class IdOps[O] private[Stream] (private val free: FreeC[Id, O, Unit]) extends AnyVal {
final class IdOps[O] private[Stream] (private val free: FreeC[Id, O, IAny]) extends AnyVal {
private def idToApplicative[F[_]: Applicative]: Id ~> F =
new (Id ~> F) { def apply[A](a: Id[A]) = a.pure[F] }

Expand All @@ -3848,7 +3845,7 @@ object Stream extends StreamLowPriority {
new FallibleOps(s.free)

/** Provides syntax for fallible streams. */
final class FallibleOps[O] private[Stream] (private val free: FreeC[Fallible, O, Unit])
final class FallibleOps[O] private[Stream] (private val free: FreeC[Fallible, O, IAny])
extends AnyVal {
private def self: Stream[Fallible, O] = new Stream(free)

Expand Down Expand Up @@ -3880,7 +3877,7 @@ object Stream extends StreamLowPriority {
new FallibleTo(s.free)

/** Provides `to` syntax for fallible streams. */
final class FallibleTo[O] private[Stream] (private val free: FreeC[Fallible, O, Unit])
final class FallibleTo[O] private[Stream] (private val free: FreeC[Fallible, O, IAny])
extends AnyVal {
private def self: Stream[Fallible, O] = new Stream(free)

Expand All @@ -3891,7 +3888,7 @@ object Stream extends StreamLowPriority {

/** Projection of a `Stream` providing various ways to get a `Pull` from the `Stream`. */
final class ToPull[F[_], O] private[Stream] (
private val free: FreeC[F, O, Unit]
private val free: FreeC[F, O, IAny]
) extends AnyVal {
private def self: Stream[F, O] =
new Stream(free)
Expand Down Expand Up @@ -4029,7 +4026,7 @@ object Stream extends StreamLowPriority {
}

/** Writes all inputs to the output of the returned `Pull`. */
def echo: Pull[F, O, Unit] = new Pull(free)
def echo: Pull[F, O, IAny] = new Pull(free)

/** Reads a single element from the input and emits it to the output. */
def echo1: Pull[F, O, Option[Stream[F, O]]] =
Expand Down Expand Up @@ -4262,7 +4259,7 @@ object Stream extends StreamLowPriority {
}

object Compiler extends LowPrioCompiler {
private def compile[F[_], O, B](stream: FreeC[F, O, Unit], init: B)(
private def compile[F[_], O, B](stream: FreeC[F, O, IAny], init: B)(
f: (B, Chunk[O]) => B
)(implicit F: Sync[F]): F[B] =
F.bracketCase(CompileScope.newRoot[F])(scope =>
Expand Down Expand Up @@ -4305,7 +4302,7 @@ object Stream extends StreamLowPriority {

/** Projection of a `Stream` providing various ways to compile a `Stream[F,O]` to an `F[...]`. */
final class CompileOps[F[_], G[_], O] private[Stream] (
private val free: FreeC[F, O, Unit]
private val free: FreeC[F, O, IAny]
)(implicit compiler: Compiler[F, G]) {
private def self: Stream[F, O] =
new Stream(free)
Expand Down Expand Up @@ -4616,7 +4613,7 @@ object Stream extends StreamLowPriority {
final class StepLeg[F[_], O](
val head: Chunk[O],
private[fs2] val scopeId: Token,
private[fs2] val next: FreeC[F, O, Unit]
private[fs2] val next: FreeC[F, O, IAny]
) { self =>

/**
Expand All @@ -4631,7 +4628,6 @@ object Stream extends StreamLowPriority {
.loop[F, O, StepLeg[F, O]] { leg =>
Pull.output(leg.head).flatMap(_ => leg.stepLeg)
}(self.setHead(Chunk.empty))
.void
.stream

/** Replaces head of this leg. Useful when the head was not fully consumed. */
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/fs2/concurrent/Signal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object Signal extends SignalLowPriorityImplicits {
Pull.output1[F, PullOutput]((x, y, restOfXs, restOfYs)): Pull[F, PullOutput, Unit]
}
} yield ()
firstPull.value.void.stream
firstPull.value.stream
.flatMap {
case (x, y, restOfXs, restOfYs) =>
restOfXs.either(restOfYs).scan((x, y)) {
Expand Down
6 changes: 6 additions & 0 deletions core/shared/src/main/scala/fs2/fs2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,10 @@ package object fs2 {
* Alias for `Nothing` which works better with type inference.
*/
type INothing <: Nothing

/**
* Alias for `Any` which may work better with type inference.
*/
type IAny >: Any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we were to go this way, I don't think we need IAny as scalac doesn't treat Any special from type inference perspective.


}
Loading