Skip to content

Commit

Permalink
Merge pull request #2798 from bplommer/covary
Browse files Browse the repository at this point in the history
Remove unneeded uses of `covary`
  • Loading branch information
mpilquist authored Jan 23, 2022
2 parents 99ea811 + 92b3ef9 commit 6d57997
Show file tree
Hide file tree
Showing 13 changed files with 30 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class ConcurrentBenchmark {
val each = Stream
.range(0, 1000)
.map(i => Stream.eval(IO.pure(i)))
.covary[IO]
each.parJoin(concurrent).compile.last.unsafeRunSync().get
}
}
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/fs2/Compiler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ object Compiler extends CompilerLowPriority {
)(foldChunk: (B, Chunk[O]) => B): B =
Compiler
.target[SyncIO]
.apply(stream.covary[SyncIO], init)(foldChunk)
.apply(stream, init)(foldChunk)
.unsafeRunSync()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class StreamConcurrentlySuite extends Fs2Suite {

Stream
.bracket(IO.unit)(_ => finRef.update(_ :+ "Outer"))
.flatMap(_ => s.covary[IO].concurrently(runner))
.flatMap(_ => s.concurrently(runner))
.interruptWhen(halt.get.attempt)
.compile
.drain
Expand Down
29 changes: 10 additions & 19 deletions core/shared/src/test/scala/fs2/StreamInterruptSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class StreamInterruptSuite extends Fs2Suite {
Stream
.eval(Semaphore[IO](0))
.flatMap { semaphore =>
s.covary[IO].evalMap(_ => semaphore.acquire).interruptWhen(interruptSoon)
s.evalMap(_ => semaphore.acquire).interruptWhen(interruptSoon)
}
.compile
.toList
Expand All @@ -52,7 +52,7 @@ class StreamInterruptSuite extends Fs2Suite {
.eval(Semaphore[IO](0))
.flatMap { semaphore =>
val interrupt = Stream.emit(true) ++ Stream.exec(semaphore.release)
s.covary[IO].evalMap(_ => semaphore.acquire).interruptWhen(interrupt)
s.evalMap(_ => semaphore.acquire).interruptWhen(interrupt)
}
.compile
.toList
Expand All @@ -66,7 +66,6 @@ class StreamInterruptSuite extends Fs2Suite {
val interruptSoon = Stream.sleep_[IO](20.millis).compile.drain.attempt
Stream
.constant(true)
.covary[IO]
.interruptWhen(interruptSoon)
.compile
.drain
Expand All @@ -78,7 +77,6 @@ class StreamInterruptSuite extends Fs2Suite {
Stream.sleep_[IO](20.millis).compile.drain.attempt
Stream
.constant(true)
.covary[IO]
.interruptWhen(interrupt)
.flatMap(_ => Stream.emit(1))
.compile
Expand All @@ -91,7 +89,7 @@ class StreamInterruptSuite extends Fs2Suite {
Stream.sleep_[IO](20.millis).compile.drain.attempt

def loop(i: Int): Stream[IO, Int] =
Stream.emit(i).covary[IO].flatMap(i => Stream.emit(i) ++ loop(i + 1))
Stream.emit(i).flatMap(i => Stream.emit(i) ++ loop(i + 1))

loop(0)
.interruptWhen(interrupt)
Expand All @@ -116,7 +114,7 @@ class StreamInterruptSuite extends Fs2Suite {

test("7 - interruption of an infinitely recursive stream that never emits and has no eval") {
val interrupt = Stream.sleep_[IO](20.millis).compile.drain.attempt
def loop: Stream[IO, Int] = Stream.emit(()).covary[IO] >> loop
def loop: Stream[IO, Int] = Stream.emit(()) >> loop
loop
.interruptWhen(interrupt)
.compile
Expand All @@ -141,7 +139,6 @@ class StreamInterruptSuite extends Fs2Suite {
Stream
.constant(true)
.dropWhile(!_)
.covary[IO]
.interruptWhen(interrupt)
.compile
.drain
Expand All @@ -166,7 +163,7 @@ class StreamInterruptSuite extends Fs2Suite {
.flatMap { barrier =>
Stream.eval(Semaphore[IO](0)).flatMap { enableInterrupt =>
val interrupt = Stream.eval(enableInterrupt.acquire) >> Stream.emit(false)
s.covary[IO]
s
.evalMap { i =>
// enable interruption and hang when hitting a value divisible by 7
if (i % 7 == 0) enableInterrupt.release.flatMap(_ => barrier.acquire.as(i))
Expand All @@ -192,7 +189,7 @@ class StreamInterruptSuite extends Fs2Suite {
Stream
.eval(Semaphore[IO](0))
.flatMap { semaphore =>
s.covary[IO].interruptWhen(interrupt) >> Stream.exec(semaphore.acquire)
s.interruptWhen(interrupt) >> Stream.exec(semaphore.acquire)
}
.compile
.toList
Expand All @@ -202,7 +199,6 @@ class StreamInterruptSuite extends Fs2Suite {

test("12a - minimal interruption of stream that never terminates in flatMap") {
Stream(1)
.covary[IO]
.interruptWhen(IO.sleep(10.millis).attempt)
.flatMap(_ => Stream.eval(IO.never))
.compile
Expand All @@ -219,7 +215,6 @@ class StreamInterruptSuite extends Fs2Suite {
.flatMap { semaphore =>
Stream(1)
.append(s)
.covary[IO]
.interruptWhen(interrupt.covaryOutput[Boolean])
.flatMap(_ => Stream.exec(semaphore.acquire))
}
Expand All @@ -244,8 +239,7 @@ class StreamInterruptSuite extends Fs2Suite {
forAllF { (s: Stream[Pure, Int]) =>
val expected = s.toList
val interrupt = IO.sleep(50.millis).attempt
s.covary[IO]
.interruptWhen(interrupt)
s.interruptWhen(interrupt)
.evalMap(_ => IO.never)
.drain
.append(s)
Expand All @@ -259,8 +253,7 @@ class StreamInterruptSuite extends Fs2Suite {
forAllF { (s: Stream[Pure, Int]) =>
val expected = s.toList
val interrupt = IO.sleep(50.millis).attempt
s.covary[IO]
.interruptWhen(interrupt)
s.interruptWhen(interrupt)
.evalMap(_ => IO.never.as(None))
.append(s.map(Some(_)))
.collect { case Some(v) => v }
Expand All @@ -274,8 +267,7 @@ class StreamInterruptSuite extends Fs2Suite {
forAllF { (s: Stream[Pure, Int]) =>
val expected = s.toList
val interrupt = Stream.sleep_[IO](20.millis).compile.drain.attempt
s.covary[IO]
.append(Stream(1))
s.append(Stream(1))
.interruptWhen(interrupt)
.map(_ => None)
.append(s.map(Some(_)))
Expand Down Expand Up @@ -332,7 +324,6 @@ class StreamInterruptSuite extends Fs2Suite {

test("18 - resume with append after evalMap interruption") {
Stream(1)
.covary[IO]
.interruptWhen(IO.sleep(50.millis).attempt)
.evalMap(_ => IO.never)
.append(Stream(5))
Expand Down Expand Up @@ -362,7 +353,7 @@ class StreamInterruptSuite extends Fs2Suite {
.flatMap { semaphore =>
val interrupt = IO.sleep(50.millis).attempt
val neverInterrupt = (IO.never: IO[Unit]).attempt
s.covary[IO]
s
.interruptWhen(interrupt)
.as(None)
.append(s.map(Option(_)))
Expand Down
8 changes: 4 additions & 4 deletions core/shared/src/test/scala/fs2/StreamObserveSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ class StreamObserveSuite extends Fs2Suite {
test("basic functionality") {
forAllF { (s: Stream[Pure, Int]) =>
IO.ref(0).flatMap { sum =>
observer(s.covary[IO])(_.foreach(i => sum.update(_ + i))).compile.foldMonoid
observer(s)(_.foreach(i => sum.update(_ + i))).compile.foldMonoid
.flatMap(out => sum.get.assertEquals(out))
}
}
}

test("handle errors from observing sink") {
forAllF { (s: Stream[Pure, Int]) =>
observer(s.covary[IO])(_ => Stream.raiseError[IO](new Err)).attempt.compile.toList
observer(s)(_ => Stream.raiseError[IO](new Err)).attempt.compile.toList
.map { result =>
assertEquals(result.size, 1)
assert(
Expand Down Expand Up @@ -78,7 +78,7 @@ class StreamObserveSuite extends Fs2Suite {
group("handle finite observing sink") {
test("1") {
forAllF { (s: Stream[Pure, Int]) =>
observer(s.covary[IO])(_ => Stream.empty).compile.toList.assertEquals(Nil)
observer(s)(_ => Stream.empty).compile.toList.assertEquals(Nil)
}
}
test("2") {
Expand All @@ -93,7 +93,7 @@ class StreamObserveSuite extends Fs2Suite {
forAllF { (s: Stream[Pure, Int]) =>
val sink: Pipe[IO, Int, INothing] = _.foreach(_ => IO.unit)

observer(observer(s.covary[IO])(sink))(sink).compile.toList
observer(observer(s)(sink))(sink).compile.toList
.assertEquals(s.toList)
}
}
Expand Down
7 changes: 3 additions & 4 deletions core/shared/src/test/scala/fs2/StreamParJoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class StreamParJoinSuite extends Fs2Suite {
forAllF { (s: Stream[Pure, Int]) =>
val expected = s.toList
s.covary[IO]
.map(Stream.emit(_).covary[IO])
.map(Stream.emit(_))
.parJoin(1)
.compile
.toList
Expand All @@ -49,7 +49,7 @@ class StreamParJoinSuite extends Fs2Suite {
val n = (n0 % 20).abs + 1
val expected = s.toList.toSet
s.covary[IO]
.map(Stream.emit(_).covary[IO])
.map(Stream.emit(_))
.parJoin(n)
.compile
.toList
Expand All @@ -62,8 +62,7 @@ class StreamParJoinSuite extends Fs2Suite {
forAllF { (s: Stream[Pure, Stream[Pure, Int]], n0: Int) =>
val n = (n0 % 20).abs + 1
val expected = s.flatten.toList.toSet
s.map(_.covary[IO])
.covary[IO]
s.covary[IO]
.parJoin(n)
.compile
.toList
Expand Down
16 changes: 4 additions & 12 deletions core/shared/src/test/scala/fs2/StreamSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class StreamSuite extends Fs2Suite {
Stream
.eval(SyncIO[Int](throw new Err))
.map(Right(_): Either[Throwable, Int])
.handleErrorWith(t => Stream.emit(Left(t)).covary[SyncIO])
.handleErrorWith(t => Stream.emit(Left(t)))
.take(1)
.compile
.toVector
Expand Down Expand Up @@ -214,7 +214,7 @@ class StreamSuite extends Fs2Suite {
}

test("7 - parJoin") {
Stream(Stream.emit(1).covary[IO], Stream.raiseError[IO](new Err), Stream.emit(2).covary[IO])
Stream(Stream.emit(1), Stream.raiseError[IO](new Err), Stream.emit(2))
.covary[IO]
.parJoin(4)
.attempt
Expand All @@ -232,7 +232,6 @@ class StreamSuite extends Fs2Suite {
Counter[IO].flatMap { counter =>
Pull
.pure(42)
.covary[IO]
.handleErrorWith(_ => Pull.eval(counter.increment))
.flatMap(_ => Pull.raiseError[IO](new Err))
.stream
Expand Down Expand Up @@ -300,7 +299,6 @@ class StreamSuite extends Fs2Suite {
Counter[IO].flatMap { counter =>
Stream
.range(0, 10)
.covary[IO]
.append(Stream.raiseError[IO](new Err))
.handleErrorWith(_ => Stream.eval(counter.increment))
.compile
Expand All @@ -311,7 +309,6 @@ class StreamSuite extends Fs2Suite {
test("14") {
Stream
.range(0, 3)
.covary[SyncIO]
.append(Stream.raiseError[SyncIO](new Err))
.chunkLimit(1)
.unchunks
Expand All @@ -325,11 +322,8 @@ class StreamSuite extends Fs2Suite {

test("15") {
Counter[IO].flatMap { counter =>
{
Stream
.range(0, 3)
.covary[IO] ++ Stream.raiseError[IO](new Err)
}.chunkLimit(1)
(Stream.range(0, 3) ++ Stream.raiseError[IO](new Err))
.chunkLimit(1)
.unchunks
.pull
.echo
Expand Down Expand Up @@ -473,7 +467,6 @@ class StreamSuite extends Fs2Suite {
Stream
.emit(1)
.append(Stream.raiseError[IO](new Err))
.covary[IO]
.compile
.drain
.intercept[Err]
Expand All @@ -484,7 +477,6 @@ class StreamSuite extends Fs2Suite {
.emit(1)
.append(Stream.raiseError[IO](new Err))
.take(1)
.covary[IO]
.compile
.drain
}
Expand Down
8 changes: 3 additions & 5 deletions core/shared/src/test/scala/fs2/StreamSwitchMapSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class StreamSwitchMapSuite extends Fs2Suite {
Stream
.eval(Semaphore[IO](1))
.flatMap { guard =>
s.covary[IO]
.evalTap(_ => guard.acquire) // wait for inner to emit to prevent switching
s.evalTap(_ => guard.acquire) // wait for inner to emit to prevent switching
.onFinalize(guard.acquire) // outer terminates, wait for last inner to emit
.switchMap(x => Stream.emit(x).onFinalize(guard.release))
}
Expand All @@ -52,7 +51,7 @@ class StreamSwitchMapSuite extends Fs2Suite {
Stream
.eval(Ref[IO].of(true))
.flatMap { ref =>
s.covary[IO].switchMap { _ =>
s.switchMap { _ =>
Stream.eval(ref.get).flatMap { released =>
if (!released) Stream.raiseError[IO](new Err)
else
Expand All @@ -70,8 +69,7 @@ class StreamSwitchMapSuite extends Fs2Suite {
test("when primary stream terminates, inner stream continues") {
forAllF { (s1: Stream[Pure, Int], s2: Stream[Pure, Int]) =>
val expected = s1.last.unNoneTerminate.flatMap(s => s2 ++ Stream(s)).toList
s1.covary[IO]
.switchMap(s => Stream.sleep_[IO](25.millis) ++ s2 ++ Stream.emit(s))
s1.switchMap(s => Stream.sleep_[IO](25.millis) ++ s2 ++ Stream.emit(s))
.compile
.toList
.assertEquals(expected)
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/test/scala/fs2/TimedPullsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class TimedPullsSuite extends Fs2Suite {
val timeout = 500.millis
val t = 600.millis
val n = 10L
val s = Stream.constant(1).covary[IO].metered(t).take(n)
val s = Stream.constant(1).metered[IO](t).take(n)
val expected = Stream("timeout", "elem").repeat.take(n * 2).compile.toList

TestControl
Expand Down
3 changes: 0 additions & 3 deletions core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class TopicSuite extends Fs2Suite {
val publisher =
Stream
.range(0, count)
.covary[IO]
.through(topic.publish)

// Ensures all subs are registered before consuming
Expand Down Expand Up @@ -85,7 +84,6 @@ class TopicSuite extends Fs2Suite {
val publisher =
Stream
.range(0, count)
.covary[IO]
.through(topic.publish)

// Ensures all subs are registered before consuming
Expand Down Expand Up @@ -127,7 +125,6 @@ class TopicSuite extends Fs2Suite {

val publisher = Stream.sleep[IO](1.second) ++ Stream
.range(0, count)
.covary[IO]
.evalTap(i => signal.set(i))
.through(topic.publish)
val subscriber = topic
Expand Down
3 changes: 1 addition & 2 deletions io/js/src/test/scala/fs2/io/file/BaseFileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ trait BaseFileSuite {
Stream
.range(0, 4)
.map(_.toByte)
.covary[IO]
.metered(250.millis)
.metered[IO](250.millis)
.through(Files[IO].writeAll(file, Flags.Append))

protected def deleteDirectoryRecursively(dir: Path): IO[Unit] =
Expand Down
3 changes: 1 addition & 2 deletions io/jvm/src/test/scala/fs2/io/file/BaseFileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ trait BaseFileSuite extends Fs2Suite {
Stream
.range(0, 4)
.map(_.toByte)
.covary[IO]
.metered(250.millis)
.metered[IO](250.millis)
.through(Files[IO].writeAll(file))

protected def deleteDirectoryRecursively(dir: Path): IO[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ final class StreamUnicastPublisherSpec
if (n == java.lang.Long.MAX_VALUE) Stream.range(1, 20).repeat
else Stream(1).repeat.scan(1)(_ + _).map(i => if (i > n) None else Some(i)).unNoneTerminate

StreamUnicastPublisher(s.covary[IO], dispatcher)
StreamUnicastPublisher(s, dispatcher)
}

def createFailedPublisher(): FailedPublisher = new FailedPublisher()
Expand Down

0 comments on commit 6d57997

Please sign in to comment.