Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed Pull#{stream,streamNoScope} to require Unit result type #1512

Merged
merged 1 commit into from
Jun 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/jvm/src/main/scala/fs2/compress.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object compress {
def inflate[F[_]](nowrap: Boolean = false, bufferSize: Int = 1024 * 32)(
implicit ev: RaiseThrowable[F]): Pipe[F, Byte, Byte] =
_.pull.uncons.flatMap {
case None => Pull.pure(None)
case None => Pull.done
case Some((hd, tl)) =>
val inflater = new Inflater(nowrap)
val buffer = new Array[Byte](bufferSize)
Expand Down
27 changes: 24 additions & 3 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,19 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing
def attempt: Pull[F, O, Either[Throwable, R]] =
Pull.fromFreeC(get[F, O, R].map(r => Right(r)).handleErrorWith(t => FreeC.pure(Left(t))))

/** Interpret this `Pull` to produce a `Stream`. The result type `R` is discarded. */
def stream: Stream[F, O] =
/**
* Interpret this `Pull` to produce a `Stream`.
*
* May only be called on pulls which return a `Unit` result type. Use `p.void.stream` to explicitly
* ignore the result type of the pull.
*/
def stream(implicit ev: R <:< Unit): Stream[F, O] = {
val _ = ev
Stream.fromFreeC(this.scope.get[F, O, Unit])
}

// For binary compatibility with 1.0.x
private[Pull] def stream: Stream[F, O] =
Stream.fromFreeC(this.scope.get[F, O, Unit])

/**
Expand All @@ -54,7 +65,14 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing
* closing but when using `streamNoScope`, they get promoted to the current stream scope,
* which may be infinite in the worst case.
*/
def streamNoScope: Stream[F, O] = Stream.fromFreeC(get[F, O, R].map(_ => ()))
def streamNoScope(implicit ev: R <:< Unit): Stream[F, O] = {
val _ = ev
Stream.fromFreeC(this.asInstanceOf[Pull[F, O, Unit]].get[F, O, Unit])
}

// For binary compatibility with 1.0.x
private[Pull] def streamNoScope: Stream[F, O] =
Stream.fromFreeC(this.asInstanceOf[Pull[F, O, Unit]].get[F, O, Unit])

/** Applies the resource of this pull to `f` and returns the result. */
def flatMap[F2[x] >: F[x], O2 >: O, R2](f: R => Pull[F2, O2, R2]): Pull[F2, O2, R2] =
Expand Down Expand Up @@ -93,6 +111,9 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing

/** Tracks any resources acquired during this pull and releases them when the pull completes. */
def scope: Pull[F, O, Unit] = Pull.fromFreeC(Algebra.scope(get[F, O, R].map(_ => ())))

/** Discards the result type of this pull. */
def void: Pull[F, O, Unit] = as(())
}

object Pull extends PullLowPriority {
Expand Down
58 changes: 31 additions & 27 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
}

this.pull.uncons.flatMap {
case None => Pull.pure(None)
case None => Pull.done
case Some((hd, tl)) =>
if (hd.size >= n)
Pull.output1(hd) >> go(Chunk.Queue.empty, tl)
Expand Down Expand Up @@ -476,8 +476,8 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
this.pull
.find(pf.isDefinedAt)
.flatMap {
case None => Pull.pure(None)
case Some((hd, tl)) => Pull.output1(pf(hd)).as(None)
case None => Pull.done
case Some((hd, tl)) => Pull.output1(pf(hd))
}
.stream

Expand Down Expand Up @@ -676,7 +676,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
this.pull
.takeWhile(o => !p(o))
.flatMap {
case None => Pull.pure(None)
case None => Pull.done
case Some(s) => s.drop(1).pull.echo
}
.stream
Expand Down Expand Up @@ -839,9 +839,9 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
def dropRight(n: Int): Stream[F, O] =
if (n <= 0) this
else {
def go(acc: Chunk.Queue[O], s: Stream[F, O]): Pull[F, O, Option[Unit]] =
def go(acc: Chunk.Queue[O], s: Stream[F, O]): Pull[F, O, Unit] =
s.pull.uncons.flatMap {
case None => Pull.pure(None)
case None => Pull.done
case Some((hd, tl)) =>
val all = acc :+ hd
all
Expand Down Expand Up @@ -949,20 +949,20 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* }}}
*/
def evalScan[F2[x] >: F[x], O2](z: O2)(f: (O2, O) => F2[O2]): Stream[F2, O2] = {
def go(z: O2, s: Stream[F2, O]): Pull[F2, O2, Option[Stream[F2, O2]]] =
def go(z: O2, s: Stream[F2, O]): Pull[F2, O2, Unit] =
s.pull.uncons1.flatMap {
case Some((hd, tl)) =>
Pull.eval(f(z, hd)).flatMap { o =>
Pull.output1(o) >> go(o, tl)
}
case None => Pull.pure(None)
case None => Pull.done
}
this.pull.uncons1.flatMap {
case Some((hd, tl)) =>
Pull.eval(f(z, hd)).flatMap { o =>
Pull.output(Chunk.seq(List(z, o))) >> go(o, tl)
}
case None => Pull.output1(z) >> Pull.pure(None)
case None => Pull.output1(z)
}.stream
}

Expand Down Expand Up @@ -1016,9 +1016,9 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* }}}
*/
def filterWithPrevious(f: (O, O) => Boolean): Stream[F, O] = {
def go(last: O, s: Stream[F, O]): Pull[F, O, Option[Unit]] =
def go(last: O, s: Stream[F, O]): Pull[F, O, Unit] =
s.pull.uncons.flatMap {
case None => Pull.pure(None)
case None => Pull.done
case Some((hd, tl)) =>
// Check if we can emit this chunk unmodified
val (allPass, newLast) = hd.foldLeft((true, last)) {
Expand All @@ -1036,7 +1036,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
}
}
this.pull.uncons1.flatMap {
case None => Pull.pure(None)
case None => Pull.done
case Some((hd, tl)) => Pull.output1(hd) >> go(hd, tl)
}.stream
}
Expand Down Expand Up @@ -1518,7 +1518,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
*/
def intersperse[O2 >: O](separator: O2): Stream[F, O2] =
this.pull.echo1.flatMap {
case None => Pull.pure(None)
case None => Pull.done
case Some(s) =>
s.repeatPull {
_.uncons.flatMap {
Expand Down Expand Up @@ -2303,7 +2303,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
*/
def scanChunksOpt[S, O2 >: O, O3](init: S)(
f: S => Option[Chunk[O2] => (S, Chunk[O3])]): Stream[F, O3] =
this.pull.scanChunksOpt(init)(f).stream
this.pull.scanChunksOpt(init)(f).void.stream

/**
* Alias for `map(f).scanMonoid`.
Expand Down Expand Up @@ -2464,7 +2464,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* res0: List[Int] = List(0, 1, 2, 3, 4)
* }}}
*/
def take(n: Long): Stream[F, O] = this.pull.take(n).stream
def take(n: Long): Stream[F, O] = this.pull.take(n).void.stream

/**
* Emits the last `n` elements of the input.
Expand All @@ -2490,7 +2490,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* }}}
*/
def takeThrough(p: O => Boolean): Stream[F, O] =
this.pull.takeThrough(p).stream
this.pull.takeThrough(p).void.stream

/**
* Emits the longest prefix of the input for which all elements test true according to `f`.
Expand All @@ -2501,7 +2501,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* }}}
*/
def takeWhile(p: O => Boolean, takeFailure: Boolean = false): Stream[F, O] =
this.pull.takeWhile(p, takeFailure).stream
this.pull.takeWhile(p, takeFailure).void.stream

/**
* Transforms this stream using the given `Pipe`.
Expand Down Expand Up @@ -2618,16 +2618,19 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
}
}

covaryAll[F2, O2].pull.stepLeg.flatMap {
case Some(leg1) =>
that.pull.stepLeg
.flatMap {
case Some(leg2) => go(leg1, leg2)
case None => k1(Left((leg1.head, leg1.stream)))
}
covaryAll[F2, O2].pull.stepLeg
.flatMap {
case Some(leg1) =>
that.pull.stepLeg
.flatMap {
case Some(leg2) => go(leg1, leg2)
case None => k1(Left((leg1.head, leg1.stream)))
}

case None => k2(Right(that))
}.stream
case None => k2(Right(that))
}
.void
.stream
}

/**
Expand Down Expand Up @@ -3523,7 +3526,7 @@ object Stream extends StreamLowPriority {
*/
def repeatPull[O2](
using: Stream.ToPull[F, O] => Pull[F, O2, Option[Stream[F, O]]]): Stream[F, O2] =
Pull.loop(using.andThen(_.map(_.map(_.pull))))(pull).stream
Pull.loop(using.andThen(_.map(_.map(_.pull))))(pull).void.stream

}

Expand Down Expand Up @@ -4302,6 +4305,7 @@ object Stream extends StreamLowPriority {
.loop[F, O, StepLeg[F, O]] { leg =>
Pull.output(leg.head).flatMap(_ => leg.stepLeg)
}(self.setHead(Chunk.empty))
.void
.stream

/** Replaces head of this leg. Useful when the head was not fully consumed. */
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/fs2/concurrent/Signal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object Signal extends SignalLowPriorityImplicits {
(y, restOfYs) = firstYAndRestOfYs
_ <- OptionT.liftF(Pull.output1[F, PullOutput]((x, y, restOfXs, restOfYs)))
} yield ()
firstPull.value.stream
firstPull.value.void.stream
.covaryOutput[PullOutput]
.flatMap {
case (x, y, restOfXs, restOfYs) =>
Expand Down
21 changes: 9 additions & 12 deletions core/shared/src/main/scala/fs2/text.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,20 @@ object text {
Chunk.bytes(allBytes.drop(splitAt)))
}

def doPull(
buf: Chunk[Byte],
s: Stream[Pure, Chunk[Byte]]): Pull[Pure, String, Option[Stream[Pure, Chunk[Byte]]]] =
def doPull(buf: Chunk[Byte], s: Stream[Pure, Chunk[Byte]]): Pull[Pure, String, Unit] =
s.pull.uncons.flatMap {
case Some((byteChunks, tail)) =>
val (output, nextBuffer) =
byteChunks.toList.foldLeft((Nil: List[String], buf))(processSingleChunk)
Pull.output(Chunk.seq(output.reverse)) >> doPull(nextBuffer, tail)
case None if !buf.isEmpty =>
Pull.output1(new String(buf.toArray, utf8Charset)) >> Pull.pure(None)
Pull.output1(new String(buf.toArray, utf8Charset))
case None =>
Pull.pure(None)
Pull.done
}

def processByteOrderMark(
buffer: Option[Chunk.Queue[Byte]],
s: Stream[Pure, Chunk[Byte]]): Pull[Pure, String, Option[Stream[Pure, Chunk[Byte]]]] =
def processByteOrderMark(buffer: Option[Chunk.Queue[Byte]],
s: Stream[Pure, Chunk[Byte]]): Pull[Pure, String, Unit] =
s.pull.uncons1.flatMap {
case Some((hd, tl)) =>
val newBuffer = buffer.getOrElse(Chunk.Queue.empty[Byte]) :+ hd
Expand All @@ -93,7 +90,7 @@ object text {
case Some(b) =>
doPull(Chunk.empty, Stream.emits(b.chunks))
case None =>
Pull.pure(None)
Pull.done
}
}

Expand Down Expand Up @@ -179,15 +176,15 @@ object text {

def go(buffer: Vector[String],
pendingLineFeed: Boolean,
s: Stream[F, String]): Pull[F, String, Option[Unit]] =
s: Stream[F, String]): Pull[F, String, Unit] =
s.pull.uncons.flatMap {
case Some((chunk, s)) =>
val (toOutput, newBuffer, newPendingLineFeed) =
extractLines(buffer, chunk, pendingLineFeed)
Pull.output(toOutput) >> go(newBuffer, newPendingLineFeed, s)
case None if buffer.nonEmpty =>
Pull.output1(buffer.mkString) >> Pull.pure(None)
case None => Pull.pure(None)
Pull.output1(buffer.mkString)
case None => Pull.done
}

s =>
Expand Down
16 changes: 8 additions & 8 deletions core/shared/src/test/scala/fs2/CompilationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ object ThisModuleShouldCompile {
/* Some checks that `.pull` can be used without annotations */
Stream(1, 2, 3, 4).through(_.take(2))
Stream.eval(IO.pure(1)).through(_.take(2))
Stream(1, 2, 3).covary[IO].pull.uncons1.stream
Stream.eval(IO.pure(1)).pull.uncons1.stream
Stream(1, 2, 3).covary[IO].pull.uncons1.void.stream
Stream.eval(IO.pure(1)).pull.uncons1.void.stream

/* Also in a polymorphic context. */
def a[F[_], A](s: Stream[F, A]) = s.through(_.take(2))
Expand All @@ -36,20 +36,20 @@ object ThisModuleShouldCompile {
.pull
.uncons1
.flatMap {
case Some((hd, _)) => Pull.output1(hd).as(None)
case None => Pull.pure(None)
case Some((hd, _)) => Pull.output1(hd)
case None => Pull.done
}
.stream
Stream(1, 2, 3).pull.uncons1
.flatMap {
case Some((hd, _)) => Pull.output1(hd).as(None)
case None => Pull.pure(None)
case Some((hd, _)) => Pull.output1(hd)
case None => Pull.done
}
.stream
Stream(1, 2, 3).pull.uncons1
.flatMap {
case Some((hd, _)) => Pull.eval(IO.pure(1)) >> Pull.output1(hd).as(None)
case None => Pull.pure(None)
case Some((hd, _)) => Pull.eval(IO.pure(1)) >> Pull.output1(hd)
case None => Pull.done
}
.stream
(Stream(1, 2, 3).evalMap(IO(_))): Stream[IO, Int]
Expand Down
1 change: 1 addition & 0 deletions core/shared/src/test/scala/fs2/PullSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class PullSpec extends Fs2Spec {
.onFinalize(ref.set(1))
.pull
.echoChunk
.void
.stream
.compile
.toList
Expand Down