Skip to content

Commit

Permalink
Changed Pull#{stream,streamNoScope} to require Unit result type
Browse files Browse the repository at this point in the history
  • Loading branch information
mpilquist committed Jun 15, 2019
1 parent bcb1d1d commit 006ea6f
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 52 deletions.
2 changes: 1 addition & 1 deletion core/jvm/src/main/scala/fs2/compress.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object compress {
def inflate[F[_]](nowrap: Boolean = false, bufferSize: Int = 1024 * 32)(
implicit ev: RaiseThrowable[F]): Pipe[F, Byte, Byte] =
_.pull.uncons.flatMap {
case None => Pull.pure(None)
case None => Pull.done
case Some((hd, tl)) =>
val inflater = new Inflater(nowrap)
val buffer = new Array[Byte](bufferSize)
Expand Down
27 changes: 24 additions & 3 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,19 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing
def attempt: Pull[F, O, Either[Throwable, R]] =
Pull.fromFreeC(get[F, O, R].map(r => Right(r)).handleErrorWith(t => FreeC.pure(Left(t))))

/** Interpret this `Pull` to produce a `Stream`. The result type `R` is discarded. */
def stream: Stream[F, O] =
/**
* Interpret this `Pull` to produce a `Stream`.
*
* May only be called on pulls which return a `Unit` result type. Use `p.void.stream` to explicitly
* ignore the result type of the pull.
*/
def stream(implicit ev: R <:< Unit): Stream[F, O] = {
val _ = ev
Stream.fromFreeC(this.scope.get[F, O, Unit])
}

// For binary compatibility with 1.0.x
private[Pull] def stream: Stream[F, O] =
Stream.fromFreeC(this.scope.get[F, O, Unit])

/**
Expand All @@ -54,7 +65,14 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing
* closing but when using `streamNoScope`, they get promoted to the current stream scope,
* which may be infinite in the worst case.
*/
def streamNoScope: Stream[F, O] = Stream.fromFreeC(get[F, O, R].map(_ => ()))
def streamNoScope(implicit ev: R <:< Unit): Stream[F, O] = {
val _ = ev
Stream.fromFreeC(this.asInstanceOf[Pull[F, O, Unit]].get[F, O, Unit])
}

// For binary compatibility with 1.0.x
private[Pull] def streamNoScope: Stream[F, O] =
Stream.fromFreeC(this.asInstanceOf[Pull[F, O, Unit]].get[F, O, Unit])

/** Applies the resource of this pull to `f` and returns the result. */
def flatMap[F2[x] >: F[x], O2 >: O, R2](f: R => Pull[F2, O2, R2]): Pull[F2, O2, R2] =
Expand Down Expand Up @@ -93,6 +111,9 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing

/** Tracks any resources acquired during this pull and releases them when the pull completes. */
def scope: Pull[F, O, Unit] = Pull.fromFreeC(Algebra.scope(get[F, O, R].map(_ => ())))

/** Discards the result type of this pull. */
def void: Pull[F, O, Unit] = as(())
}

object Pull extends PullLowPriority {
Expand Down
58 changes: 31 additions & 27 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
}

this.pull.uncons.flatMap {
case None => Pull.pure(None)
case None => Pull.done
case Some((hd, tl)) =>
if (hd.size >= n)
Pull.output1(hd) >> go(Chunk.Queue.empty, tl)
Expand Down Expand Up @@ -476,8 +476,8 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
this.pull
.find(pf.isDefinedAt)
.flatMap {
case None => Pull.pure(None)
case Some((hd, tl)) => Pull.output1(pf(hd)).as(None)
case None => Pull.done
case Some((hd, tl)) => Pull.output1(pf(hd))
}
.stream

Expand Down Expand Up @@ -676,7 +676,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
this.pull
.takeWhile(o => !p(o))
.flatMap {
case None => Pull.pure(None)
case None => Pull.done
case Some(s) => s.drop(1).pull.echo
}
.stream
Expand Down Expand Up @@ -839,9 +839,9 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
def dropRight(n: Int): Stream[F, O] =
if (n <= 0) this
else {
def go(acc: Chunk.Queue[O], s: Stream[F, O]): Pull[F, O, Option[Unit]] =
def go(acc: Chunk.Queue[O], s: Stream[F, O]): Pull[F, O, Unit] =
s.pull.uncons.flatMap {
case None => Pull.pure(None)
case None => Pull.done
case Some((hd, tl)) =>
val all = acc :+ hd
all
Expand Down Expand Up @@ -949,20 +949,20 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* }}}
*/
def evalScan[F2[x] >: F[x], O2](z: O2)(f: (O2, O) => F2[O2]): Stream[F2, O2] = {
def go(z: O2, s: Stream[F2, O]): Pull[F2, O2, Option[Stream[F2, O2]]] =
def go(z: O2, s: Stream[F2, O]): Pull[F2, O2, Unit] =
s.pull.uncons1.flatMap {
case Some((hd, tl)) =>
Pull.eval(f(z, hd)).flatMap { o =>
Pull.output1(o) >> go(o, tl)
}
case None => Pull.pure(None)
case None => Pull.done
}
this.pull.uncons1.flatMap {
case Some((hd, tl)) =>
Pull.eval(f(z, hd)).flatMap { o =>
Pull.output(Chunk.seq(List(z, o))) >> go(o, tl)
}
case None => Pull.output1(z) >> Pull.pure(None)
case None => Pull.output1(z)
}.stream
}

Expand Down Expand Up @@ -1016,9 +1016,9 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* }}}
*/
def filterWithPrevious(f: (O, O) => Boolean): Stream[F, O] = {
def go(last: O, s: Stream[F, O]): Pull[F, O, Option[Unit]] =
def go(last: O, s: Stream[F, O]): Pull[F, O, Unit] =
s.pull.uncons.flatMap {
case None => Pull.pure(None)
case None => Pull.done
case Some((hd, tl)) =>
// Check if we can emit this chunk unmodified
val (allPass, newLast) = hd.foldLeft((true, last)) {
Expand All @@ -1036,7 +1036,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
}
}
this.pull.uncons1.flatMap {
case None => Pull.pure(None)
case None => Pull.done
case Some((hd, tl)) => Pull.output1(hd) >> go(hd, tl)
}.stream
}
Expand Down Expand Up @@ -1518,7 +1518,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
*/
def intersperse[O2 >: O](separator: O2): Stream[F, O2] =
this.pull.echo1.flatMap {
case None => Pull.pure(None)
case None => Pull.done
case Some(s) =>
s.repeatPull {
_.uncons.flatMap {
Expand Down Expand Up @@ -2303,7 +2303,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
*/
def scanChunksOpt[S, O2 >: O, O3](init: S)(
f: S => Option[Chunk[O2] => (S, Chunk[O3])]): Stream[F, O3] =
this.pull.scanChunksOpt(init)(f).stream
this.pull.scanChunksOpt(init)(f).void.stream

/**
* Alias for `map(f).scanMonoid`.
Expand Down Expand Up @@ -2464,7 +2464,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* res0: List[Int] = List(0, 1, 2, 3, 4)
* }}}
*/
def take(n: Long): Stream[F, O] = this.pull.take(n).stream
def take(n: Long): Stream[F, O] = this.pull.take(n).void.stream

/**
* Emits the last `n` elements of the input.
Expand All @@ -2490,7 +2490,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* }}}
*/
def takeThrough(p: O => Boolean): Stream[F, O] =
this.pull.takeThrough(p).stream
this.pull.takeThrough(p).void.stream

/**
* Emits the longest prefix of the input for which all elements test true according to `f`.
Expand All @@ -2501,7 +2501,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* }}}
*/
def takeWhile(p: O => Boolean, takeFailure: Boolean = false): Stream[F, O] =
this.pull.takeWhile(p, takeFailure).stream
this.pull.takeWhile(p, takeFailure).void.stream

/**
* Transforms this stream using the given `Pipe`.
Expand Down Expand Up @@ -2618,16 +2618,19 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
}
}

covaryAll[F2, O2].pull.stepLeg.flatMap {
case Some(leg1) =>
that.pull.stepLeg
.flatMap {
case Some(leg2) => go(leg1, leg2)
case None => k1(Left((leg1.head, leg1.stream)))
}
covaryAll[F2, O2].pull.stepLeg
.flatMap {
case Some(leg1) =>
that.pull.stepLeg
.flatMap {
case Some(leg2) => go(leg1, leg2)
case None => k1(Left((leg1.head, leg1.stream)))
}

case None => k2(Right(that))
}.stream
case None => k2(Right(that))
}
.void
.stream
}

/**
Expand Down Expand Up @@ -3523,7 +3526,7 @@ object Stream extends StreamLowPriority {
*/
def repeatPull[O2](
using: Stream.ToPull[F, O] => Pull[F, O2, Option[Stream[F, O]]]): Stream[F, O2] =
Pull.loop(using.andThen(_.map(_.map(_.pull))))(pull).stream
Pull.loop(using.andThen(_.map(_.map(_.pull))))(pull).void.stream

}

Expand Down Expand Up @@ -4302,6 +4305,7 @@ object Stream extends StreamLowPriority {
.loop[F, O, StepLeg[F, O]] { leg =>
Pull.output(leg.head).flatMap(_ => leg.stepLeg)
}(self.setHead(Chunk.empty))
.void
.stream

/** Replaces head of this leg. Useful when the head was not fully consumed. */
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/fs2/concurrent/Signal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object Signal extends SignalLowPriorityImplicits {
(y, restOfYs) = firstYAndRestOfYs
_ <- OptionT.liftF(Pull.output1[F, PullOutput]((x, y, restOfXs, restOfYs)))
} yield ()
firstPull.value.stream
firstPull.value.void.stream
.covaryOutput[PullOutput]
.flatMap {
case (x, y, restOfXs, restOfYs) =>
Expand Down
21 changes: 9 additions & 12 deletions core/shared/src/main/scala/fs2/text.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,20 @@ object text {
Chunk.bytes(allBytes.drop(splitAt)))
}

def doPull(
buf: Chunk[Byte],
s: Stream[Pure, Chunk[Byte]]): Pull[Pure, String, Option[Stream[Pure, Chunk[Byte]]]] =
def doPull(buf: Chunk[Byte], s: Stream[Pure, Chunk[Byte]]): Pull[Pure, String, Unit] =
s.pull.uncons.flatMap {
case Some((byteChunks, tail)) =>
val (output, nextBuffer) =
byteChunks.toList.foldLeft((Nil: List[String], buf))(processSingleChunk)
Pull.output(Chunk.seq(output.reverse)) >> doPull(nextBuffer, tail)
case None if !buf.isEmpty =>
Pull.output1(new String(buf.toArray, utf8Charset)) >> Pull.pure(None)
Pull.output1(new String(buf.toArray, utf8Charset))
case None =>
Pull.pure(None)
Pull.done
}

def processByteOrderMark(
buffer: Option[Chunk.Queue[Byte]],
s: Stream[Pure, Chunk[Byte]]): Pull[Pure, String, Option[Stream[Pure, Chunk[Byte]]]] =
def processByteOrderMark(buffer: Option[Chunk.Queue[Byte]],
s: Stream[Pure, Chunk[Byte]]): Pull[Pure, String, Unit] =
s.pull.uncons1.flatMap {
case Some((hd, tl)) =>
val newBuffer = buffer.getOrElse(Chunk.Queue.empty[Byte]) :+ hd
Expand All @@ -93,7 +90,7 @@ object text {
case Some(b) =>
doPull(Chunk.empty, Stream.emits(b.chunks))
case None =>
Pull.pure(None)
Pull.done
}
}

Expand Down Expand Up @@ -179,15 +176,15 @@ object text {

def go(buffer: Vector[String],
pendingLineFeed: Boolean,
s: Stream[F, String]): Pull[F, String, Option[Unit]] =
s: Stream[F, String]): Pull[F, String, Unit] =
s.pull.uncons.flatMap {
case Some((chunk, s)) =>
val (toOutput, newBuffer, newPendingLineFeed) =
extractLines(buffer, chunk, pendingLineFeed)
Pull.output(toOutput) >> go(newBuffer, newPendingLineFeed, s)
case None if buffer.nonEmpty =>
Pull.output1(buffer.mkString) >> Pull.pure(None)
case None => Pull.pure(None)
Pull.output1(buffer.mkString)
case None => Pull.done
}

s =>
Expand Down
16 changes: 8 additions & 8 deletions core/shared/src/test/scala/fs2/CompilationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ object ThisModuleShouldCompile {
/* Some checks that `.pull` can be used without annotations */
Stream(1, 2, 3, 4).through(_.take(2))
Stream.eval(IO.pure(1)).through(_.take(2))
Stream(1, 2, 3).covary[IO].pull.uncons1.stream
Stream.eval(IO.pure(1)).pull.uncons1.stream
Stream(1, 2, 3).covary[IO].pull.uncons1.void.stream
Stream.eval(IO.pure(1)).pull.uncons1.void.stream

/* Also in a polymorphic context. */
def a[F[_], A](s: Stream[F, A]) = s.through(_.take(2))
Expand All @@ -36,20 +36,20 @@ object ThisModuleShouldCompile {
.pull
.uncons1
.flatMap {
case Some((hd, _)) => Pull.output1(hd).as(None)
case None => Pull.pure(None)
case Some((hd, _)) => Pull.output1(hd)
case None => Pull.done
}
.stream
Stream(1, 2, 3).pull.uncons1
.flatMap {
case Some((hd, _)) => Pull.output1(hd).as(None)
case None => Pull.pure(None)
case Some((hd, _)) => Pull.output1(hd)
case None => Pull.done
}
.stream
Stream(1, 2, 3).pull.uncons1
.flatMap {
case Some((hd, _)) => Pull.eval(IO.pure(1)) >> Pull.output1(hd).as(None)
case None => Pull.pure(None)
case Some((hd, _)) => Pull.eval(IO.pure(1)) >> Pull.output1(hd)
case None => Pull.done
}
.stream
(Stream(1, 2, 3).evalMap(IO(_))): Stream[IO, Int]
Expand Down
1 change: 1 addition & 0 deletions core/shared/src/test/scala/fs2/PullSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class PullSpec extends Fs2Spec {
.onFinalize(ref.set(1))
.pull
.echoChunk
.void
.stream
.compile
.toList
Expand Down

0 comments on commit 006ea6f

Please sign in to comment.