From c0492a01c8056278112b2adc4e95d8986e3dc182 Mon Sep 17 00:00:00 2001 From: Georgi Krastev Date: Thu, 13 Aug 2020 00:52:37 +0300 Subject: [PATCH] Add conversion method from ZStream to fs2.Stream --- .../scala/zio/interop/fs2StreamSpec.scala | 174 +++++++++--------- .../zio/stream/interop/FS2StreamSyntax.scala | 32 ++-- 2 files changed, 106 insertions(+), 100 deletions(-) diff --git a/interop-cats/jvm/src/test/scala/zio/interop/fs2StreamSpec.scala b/interop-cats/jvm/src/test/scala/zio/interop/fs2StreamSpec.scala index 23d622af..1b020ae7 100644 --- a/interop-cats/jvm/src/test/scala/zio/interop/fs2StreamSpec.scala +++ b/interop-cats/jvm/src/test/scala/zio/interop/fs2StreamSpec.scala @@ -12,96 +12,92 @@ import zio.random.nextIntBetween object fs2StreamSpec extends DefaultRunnableSpec { import zio.stream.interop.fs2z._ - val exception = new Exception("Failed") + val exception: Throwable = new Exception("Failed") - def spec = suite("test toZStream conversion")( - testM("simple stream")(checkM(Gen.chunkOf(Gen.anyInt)) { chunk: Chunk[Int] => - for { - fs2Stream <- Stream - .fromIterator[Task](chunk.iterator) - .toZStream() - .runCollect - zioStream <- ZStream.fromChunk(chunk).runCollect - } yield assert(fs2Stream)(equalTo(zioStream)) - }), - testM("non empty stream")(checkM(Gen.chunkOf1(Gen.anyLong)) { chunk => - for { - fs2Stream <- Stream - .fromIterator[Task](chunk.iterator) - .toZStream() - .runCollect - zioStream <- ZStream.fromChunk(chunk).runCollect - } yield assert(fs2Stream)(equalTo(zioStream)) - }), - testM("100 element stream")(checkM(Gen.chunkOfN(100)(Gen.anyLong)) { chunk => - for { - fs2Stream <- Stream - .fromIterator[Task](chunk.iterator) - .toZStream() - .runCollect - zioStream <- ZStream.fromChunk(chunk).runCollect - } yield assert(fs2Stream)(equalTo(zioStream)) - }), - testM("error propagation") { - Task.concurrentEffectWith { implicit CE => - val fs2Stream = Stream - .raiseError[Task](exception) - .toZStream() - .runCollect - .run - assertM(fs2Stream)(fails(equalTo(exception))) - } - }, - testM("releases all resources by the time the failover stream has started") { - for { - queueSize <- nextIntBetween(2, 32) - fins <- Ref.make(Chunk[Int]()) - s = Stream(1).onFinalize(fins.update(1 +: _)) >> - Stream(2).onFinalize(fins.update(2 +: _)) >> - Stream(3).onFinalize(fins.update(3 +: _)) >> - Stream.raiseError[Task](exception) + def fs2StreamFromChunk[A](chunk: Chunk[A]) = + fs2.Stream.chunk[Task, A](fs2.Chunk.indexedSeq(chunk)) + + def assertEqual[A](actual: fs2.Stream[Task, A], expected: fs2.Stream[Task, A]) = + for { + x <- actual.compile.toVector + y <- expected.compile.toVector + } yield assert(x)(equalTo(y)) - result <- s.toZStream(queueSize).drain.catchAllCause(_ => ZStream.fromEffect(fins.get)).runCollect - } yield assert(result.flatten)(equalTo(Chunk(1, 2, 3))) - }, - testM("bigger queueSize than a chunk size")(checkM(Gen.chunkOfN(10)(Gen.anyLong)) { chunk => - for { - queueSize <- nextIntBetween(32, 256) - fs2Stream <- Stream - .fromIterator[Task](chunk.iterator) - .toZStream(queueSize) - .runCollect - zioStream <- ZStream.fromChunk(chunk).runCollect - } yield assert(fs2Stream)(equalTo(zioStream)) - }), - testM("queueSize == 1")(checkM(Gen.chunkOfN(10)(Gen.anyLong)) { chunk => - for { - fs2Stream <- Stream - .fromIterator[Task](chunk.iterator) - .toZStream(1) - .runCollect - zioStream <- ZStream.fromChunk(chunk).runCollect - } yield assert(fs2Stream)(equalTo(zioStream)) - }), - testM("negative queueSize")(checkM(Gen.chunkOfN(10)(Gen.anyLong)) { chunk => - for { - queueSize <- nextIntBetween(-128, 0) - fs2Stream <- Stream - .fromIterator[Task](chunk.iterator) - .toZStream(queueSize) - .runCollect - zioStream <- ZStream.fromChunk(chunk).runCollect - } yield assert(fs2Stream)(equalTo(zioStream)) - }), - testM("RIO")(checkM(Gen.chunkOfN(10)(Gen.anyLong)) { chunk => - for { - zioStream <- ZStream.fromChunk(chunk).runCollect - queueSize <- nextIntBetween(2, 128) - fs2Stream <- Stream - .fromIterator[RIO[Clock, *]](chunk.iterator) - .toZStream(queueSize) - .runCollect - } yield assert(fs2Stream)(equalTo(zioStream)) - }) + def assertEqual[R, E, A](actual: ZStream[R, E, A], expected: ZStream[R, E, A]) = + for { + x <- actual.runCollect + y <- expected.runCollect + } yield assert(x)(equalTo(y)) + + def spec = suite("zio.stream.ZStream <-> fs2.Stream")( + suite("test toFs2Stream conversion")( + testM("simple stream")(checkM(Gen.chunkOf(Gen.anyInt)) { chunk: Chunk[Int] => + assertEqual(ZStream.fromChunk(chunk).toFs2Stream, fs2StreamFromChunk(chunk)) + }), + testM("non empty stream")(checkM(Gen.chunkOf1(Gen.anyLong)) { chunk => + assertEqual(ZStream.fromChunk(chunk).toFs2Stream, fs2StreamFromChunk(chunk)) + }), + testM("100 element stream")(checkM(Gen.chunkOfN(100)(Gen.anyLong)) { chunk => + assertEqual(ZStream.fromChunk(chunk).toFs2Stream, fs2StreamFromChunk(chunk)) + }), + testM("error propagation") { + Task.concurrentEffectWith { implicit CE => + val result = ZStream.fail(exception).toFs2Stream.compile.drain.run + assertM(result)(fails(equalTo(exception))) + } + } + ), + suite("test toZStream conversion")( + testM("simple stream")(checkM(Gen.chunkOf(Gen.anyInt)) { chunk: Chunk[Int] => + assertEqual(fs2StreamFromChunk(chunk).toZStream(), ZStream.fromChunk(chunk)) + }), + testM("non empty stream")(checkM(Gen.chunkOf1(Gen.anyLong)) { chunk => + assertEqual(fs2StreamFromChunk(chunk).toZStream(), ZStream.fromChunk(chunk)) + }), + testM("100 element stream")(checkM(Gen.chunkOfN(100)(Gen.anyLong)) { chunk => + assertEqual(fs2StreamFromChunk(chunk).toZStream(), ZStream.fromChunk(chunk)) + }), + testM("error propagation") { + Task.concurrentEffectWith { implicit CE => + val result = Stream.raiseError[Task](exception).toZStream().runDrain.run + assertM(result)(fails(equalTo(exception))) + } + }, + testM("releases all resources by the time the failover stream has started") { + for { + queueSize <- nextIntBetween(2, 32) + fins <- Ref.make(Chunk[Int]()) + stream = Stream(1).onFinalize(fins.update(1 +: _)) >> + Stream(2).onFinalize(fins.update(2 +: _)) >> + Stream(3).onFinalize(fins.update(3 +: _)) >> + Stream.raiseError[Task](exception) + result <- stream.toZStream(queueSize).drain.catchAllCause(_ => ZStream.fromEffect(fins.get)).runCollect + } yield assert(result.flatten)(equalTo(Chunk(1, 2, 3))) + }, + testM("bigger queueSize than a chunk size")(checkM(Gen.chunkOfN(10)(Gen.anyLong)) { chunk => + for { + queueSize <- nextIntBetween(32, 256) + result <- assertEqual(fs2StreamFromChunk(chunk).toZStream(queueSize), ZStream.fromChunk(chunk)) + } yield result + }), + testM("queueSize == 1")(checkM(Gen.chunkOfN(10)(Gen.anyLong)) { chunk => + assertEqual(fs2StreamFromChunk(chunk).toZStream(1), ZStream.fromChunk(chunk)) + }), + testM("negative queueSize")(checkM(Gen.chunkOfN(10)(Gen.anyLong)) { chunk => + for { + queueSize <- nextIntBetween(-128, 0) + result <- assertEqual(fs2StreamFromChunk(chunk).toZStream(queueSize), ZStream.fromChunk(chunk)) + } yield result + }), + testM("RIO")(checkM(Gen.chunkOfN(10)(Gen.anyLong)) { chunk => + for { + queueSize <- nextIntBetween(2, 128) + result <- assertEqual( + fs2StreamFromChunk(chunk).covary[RIO[Clock, *]].toZStream(queueSize), + ZStream.fromChunk(chunk) + ) + } yield result + }) + ) ) } diff --git a/interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala b/interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala index edc72929..1f0ad8d9 100644 --- a/interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala +++ b/interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala @@ -2,33 +2,43 @@ package zio package stream.interop import fs2.Stream -import zio.stream.{ Take, ZStream } import zio.interop.catz._ +import zio.stream.{ Take, ZStream } -trait FS2StreamSyntax { +import scala.language.implicitConversions - import scala.language.implicitConversions +trait FS2StreamSyntax { implicit final def fs2RIOStreamSyntax[R, A](stream: Stream[RIO[R, *], A]): FS2RIOStreamSyntax[R, A] = new FS2RIOStreamSyntax(stream) + + implicit final def zStreamSyntax[R, E, A](stream: ZStream[R, E, A]): ZStreamSyntax[R, E, A] = + new ZStreamSyntax(stream) +} + +class ZStreamSyntax[R, E, A](private val stream: ZStream[R, E, A]) extends AnyVal { + + /** Convert a [[zio.stream.ZStream]] into an [[fs2.Stream]]. */ + def toFs2Stream: fs2.Stream[ZIO[R, E, *], A] = + fs2.Stream.resource(stream.process.toResourceZIO).flatMap { pull => + fs2.Stream.repeatEval(pull.optional).unNoneTerminate.flatMap { chunk => + fs2.Stream.chunk(fs2.Chunk.indexedSeq(chunk)) + } + } } final class FS2RIOStreamSyntax[R, A](private val stream: Stream[RIO[R, *], A]) { /** - * Convert a fs2.Stream into a ZStream. - * This method requires non-empty queue. + * Convert an [[fs2.Stream]] into a [[zio.stream.ZStream]]. + * This method requires a non-empty queue. * * When `queueSize` >= 2 utilizes chunks for better performance. * - * @note when possible use only power of 2 capacities; this will - * provide better performance of the queue. + * @note when possible use only power of 2 queue sizes; this will provide better performance of the queue. */ def toZStream[R1 <: R](queueSize: Int = 16): ZStream[R1, Throwable, A] = - if (queueSize > 1) - toZStreamChunk(queueSize) - else - toZStreamSingle + if (queueSize > 1) toZStreamChunk(queueSize) else toZStreamSingle private def toZStreamSingle[R1 <: R]: ZStream[R1, Throwable, A] = ZStream.managed {