Skip to content

Commit

Permalink
rename new API toStreamBuffered to avoid breaking binary compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
yanns committed Oct 7, 2021
1 parent b6e1847 commit 30df37d
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.reactivestreams._
* scala>
* scala> val upstream: Stream[IO, Int] = Stream(1, 2, 3).covary[IO]
* scala> val publisher: Resource[IO, StreamUnicastPublisher[IO, Int]] = upstream.toUnicastPublisher
* scala> val downstream: Stream[IO, Int] = Stream.resource(publisher).flatMap(_.toStream[IO](bufferSize = 16))
* scala> val downstream: Stream[IO, Int] = Stream.resource(publisher).flatMap(_.toStreamBuffered[IO](bufferSize = 16))
* scala>
* scala> downstream.compile.toVector.unsafeRunSync()
* res0: Vector[Int] = Vector(1, 2, 3)
Expand Down Expand Up @@ -81,12 +81,12 @@ package object reactivestreams {
* The publisher can use this `bufferSize` to query elements in batch.
* A high number will also lead to more elements in memory.
*/
def toStream[F[_]: Async](bufferSize: Int): Stream[F, A] =
def toStreamBuffered[F[_]: Async](bufferSize: Int): Stream[F, A] =
fromPublisher(publisher, bufferSize)

/** Creates a lazy stream from an `org.reactivestreams.Publisher` */
@deprecated(
"Use toStream method with a buffer size. Use a buffer size of 1 to keep the same behavior.",
"Use toStreamBuffered method instead. Use a buffer size of 1 to keep the same behavior.",
"3.1.4"
)
def toStream[F[_]: Async]: Stream[F, A] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ final class PublisherToSubscriberSpec extends Fs2Suite {
val subscriberStream =
Stream
.resource(Stream.emits(ints).covary[IO].toUnicastPublisher)
.flatMap(_.toStream[IO](bufferSize))
.flatMap(_.toStreamBuffered[IO](bufferSize))

assert(subscriberStream.compile.toVector.unsafeRunSync() == (ints.toVector))
}
Expand All @@ -46,7 +46,7 @@ final class PublisherToSubscriberSpec extends Fs2Suite {
// TODO unsafeRunSync hangs
val input: Stream[IO, Int] = Stream(1, 2, 3) ++ Stream.raiseError[IO](TestError)
val output: Stream[IO, Int] =
Stream.resource(input.toUnicastPublisher).flatMap(_.toStream[IO](1))
Stream.resource(input.toUnicastPublisher).flatMap(_.toStreamBuffered[IO](1))

assert(output.compile.drain.attempt.unsafeRunSync() == (Left(TestError)))
}
Expand All @@ -57,7 +57,7 @@ final class PublisherToSubscriberSpec extends Fs2Suite {
val subscriberStream =
Stream
.resource(Stream.emits(as ++ bs).covary[IO].toUnicastPublisher)
.flatMap(_.toStream[IO](bufferSize).take(as.size.toLong))
.flatMap(_.toStreamBuffered[IO](bufferSize).take(as.size.toLong))

assert(subscriberStream.compile.toVector.unsafeRunSync() == (as.toVector))
}
Expand Down

0 comments on commit 30df37d

Please sign in to comment.