Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publisher.toStream can demand for more than one element #2666

Merged
merged 14 commits into from
Oct 9, 2021

Conversation

yanns
Copy link
Contributor

@yanns yanns commented Oct 5, 2021

Context: https://discord.com/channels/632277896739946517/632310980449402880/894663878133317722
Asking for more than one element allows a much better performance.
For example, when subscribing to a mongo publisher, we can use the opened cursor in much less queries.

The change was tried with the following app: https://github.com/yanns/mongo-fs2/

Context: https://discord.com/channels/632277896739946517/632310980449402880/894663878133317722
Asking for more than one element allows a much better performance.
For example, when subscribing to a mongo publisher, we can use the opened cursor in much less queries.
@yanns yanns force-pushed the buffer_subscriber branch from 605b55a to 301a575 Compare October 5, 2021 18:49
def apply[F[_]: Async, A]: F[StreamSubscriber[F, A]] =
fsm[F, A].map(new StreamSubscriber(_))
apply(bufferSize = 10L)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the previous behavior asking elements one by one.
It's also a magic number coming from a quick thinking about the trade-offs memory vs performance.
Should we handle that differently? Using 1L to keep the previous behavior? Using some configuration?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the behavior if requesting 10 but there's only 5 available?

This does feel a bit too magical IMO -- I lean towards setting it to 1 or an arbitrary power of 2 :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only 5 are available, then we build a chunk of 5 elements, and it's being send in the OnComplete step: https://github.com/typelevel/fs2/pull/2666/files#diff-493b2533a887032dfbf8ea09dca7201413e124b5f45732ad6a877b8d7f69ba2cR178-R180

By setting it to 1 by default can lead to bad performances by default. I'd suggest to take an arbitrary number, not too high to avoid buffering too many elements in memory. 16?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would strongly recommend not changing existing behavior in source-compatible ways

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep agreed, let's stick with 1 here as that's what every has used for years.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point.
I see 2 possible options:

  • option 1: we set the default buffer size to 1. No surprise for existing users. Maybe surprise for new users who are struggling with bad performances.
  • option 2: we change the API to emit a compilation warning (see Publisher.toStream can demand for more than one element #2666 (comment)). Existing users will have a compilation warning, grabbing their attention on this particular change.
    • 2.a) with compilation warning + keep existing behavior with a buffer size of 1
    • 2.b) with compilation warning + using a buffer size of 16

My 2 cents from my experience as new user: I'd expect FS2 to be fast by default, and not slow by default unless I understand deeply the API and change it.
I'm not the maintainer of this lib. I let you decide.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 versus 10 versus 16 could make a performance difference in certain types of streams but not others, where folks would need 100 or 1000 or more. Hence, I think it's best if we make no assumptions at all. So let's go with deprecating the old API and forcing the buffer size decision on to users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4ad2bbb
to keep the existing behavior + deprecations

@diesalbla
Copy link
Contributor

@yanns Thanks for the Pull Request! If the current integration with reactive-streams is forcing consumers to go item-by-item, this should help performance.

One small question, that may have been discussed but missed: how does the use of Chunk in the signatures help? After all, streams are always chunked, but this is often kept as an representation. Most of the code in fs2 keeps the types of streams as the elements A, and the use of streams of chunks is for connecting with (internal) topics, queues, or channels.

@@ -69,15 +69,19 @@ final class StreamSubscriber[F[_], A](
sub.onError(t)
}

def stream(subscribe: F[Unit]): Stream[F, A] = sub.stream(subscribe)
def streamChunk(subscribe: F[Unit]): Stream[F, Chunk[A]] = sub.stream(subscribe)
def stream(subscribe: F[Unit]): Stream[F, A] = streamChunk(subscribe).unchunks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Echoing @diesalbla's comment here -- I think we should offer just the stream(...): Stream[F, A] method and let folks manually call .chunks if they want to operate on chunks explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@mpilquist mpilquist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great -- some minor comments


private def nonNull[B](b: B): Unit = if (b == null) throw new NullPointerException()
}

object StreamSubscriber {

def apply[F[_]: Async, A](bufferSize: Long): F[StreamSubscriber[F, A]] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bufferSize should be an Int here as Chunk is Int based.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def apply[F[_]: Async, A]: F[StreamSubscriber[F, A]] =
fsm[F, A].map(new StreamSubscriber(_))
apply(bufferSize = 10L)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the behavior if requesting 10 but there's only 5 available?

This does feel a bit too magical IMO -- I lean towards setting it to 1 or an arbitrary power of 2 :)

def stream(subscribe: F[Unit])(implicit ev: ApplicativeError[F, Throwable]): Stream[F, A] =
def stream(
subscribe: F[Unit]
)(implicit ev: ApplicativeError[F, Throwable]): Stream[F, Chunk[A]] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change return here to Stream[F, A]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -126,9 +134,10 @@ object StreamSubscriber {

sealed trait State
case object Uninitialized extends State
case class Idle(sub: Subscription) extends State
case class Idle(sub: Subscription, buffer: Vector[A]) extends State
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change Vector[A] to Chunk[A] and use ++ to build up the buffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case class RequestBeforeSubscription(req: Out => Unit) extends State
case class WaitingOnUpstream(sub: Subscription, elementRequest: Out => Unit) extends State
case class WaitingOnUpstream(sub: Subscription, buffer: Vector[A], elementRequest: Out => Unit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, change buffer to Chunk[A]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yanns
Copy link
Contributor Author

yanns commented Oct 6, 2021

One question about backwards compatibility: for the moment, I've went with duplicating API like:

    // new API
    def toStream[F[_]: Async](bufferSize: Int): Stream[F, A] =
      fromPublisher(publisher, bufferSize)

    // current API
    def toStream[F[_]: Async]: Stream[F, A] =
      fromPublisher(publisher)

I could also go with extending the current API, and using default values like:

    def toStream[F[_]: Async](bufferSize: Int = StreamSubscriber.DefaultBufferSize): Stream[F, A] =
      fromPublisher(publisher, bufferSize)

Calling .toStream as now would work, but would emit a warning as one should use it like .toStream().
On the plus side, it would remove some code.
What are your thoughts about this?

Copy link
Contributor

@notxcain notxcain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀

@diesalbla
Copy link
Contributor

@yanns That solution would be mostly source-compatible. However, we are also keen on binary compatibility, which is to say, the contents of the compiled bytecode across versions. The current version:

    // current API
    def toStream[F[_]: Async]: Stream[F, A] =
      fromPublisher(publisher)

In the bytecode, we have a method toStream with one parameter, of type cats.effect.Async. Instead, a Scala method with a default parameter:

    def toStream[F[_]: Async](bufferSize: Int = StreamSubscriber.DefaultBufferSize): Stream[F, A] =
      fromPublisher(publisher, bufferSize)

is compiled in the bytecode to a single method with two parameters, not to two methods with or without the parameter. Defaults values are inserted, by the compiler, in the "caller" bytecode, as arguments for the two-parameter method. However, the existing method would then be missing in the new bytecode version. As a consequence, "callers" to that method, when linked to the new bytecode, would fail.

Thus, I would recommend keeping the existing method, but add to it a deprecation notice.

Deprecate API where the buffer size is not given.
@@ -57,6 +67,11 @@ package object reactivestreams {
implicit final class PublisherOps[A](val publisher: Publisher[A]) extends AnyVal {

/** Creates a lazy stream from an `org.reactivestreams.Publisher` */
def toStream[F[_]: Async](bufferSize: Int): Stream[F, A] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yanns we probably need some docs here explaining how buffer size affects performance and what value to choose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of 357dbc0 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect.

@yanns
Copy link
Contributor Author

yanns commented Oct 6, 2021

Can someone approve the workflow to check if all the tests pass?

@yanns
Copy link
Contributor Author

yanns commented Oct 6, 2021

We now have a breaking change:

[error] fs2-reactive-streams: Failed binary compatibility check against co.fs2:fs2-reactive-streams_2.12:3.1.4! Found 1 potential problems
[error]  * extension method toStream$extension(org.reactivestreams.Publisher,cats.effect.kernel.Async)fs2.Stream in object fs2.interop.reactivestreams.package#PublisherOps does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.interop.reactivestreams.package#PublisherOps.toStream$extension")

@yanns
Copy link
Contributor Author

yanns commented Oct 6, 2021

Can someone check if it's ok to exclude this: e618c8d ?
(and trigger the workflow again, thanks!)

@yanns
Copy link
Contributor Author

yanns commented Oct 7, 2021

@mpilquist I think all the points you mentioned have been addressed. Can you check again when you can?

@mpilquist
Copy link
Member

Are we sure that the mima exclusion can safely be suppressed? It doesn't look so to me.

@mpilquist
Copy link
Member

I confirmed this change is not binary compatible.

Compiling this program against 3.1.4 and then running against this PR results in the following linkage error:

package test

import fs2._, fs2.interop.reactivestreams._
import cats.effect._

object App extends IOApp.Simple {
  def run = {
    val upstream: Stream[IO, Int] = Stream(1, 2, 3).covary[IO]
    val publisher: Resource[IO, StreamUnicastPublisher[IO, Int]] = upstream.toUnicastPublisher
    val downstream: Stream[IO, Int] = Stream.resource(publisher).flatMap(_.toStream[IO])
    downstream.foreach(IO.println).compile.drain
  }
}
➜  ./target/universal/stage/bin/fs2test
java.lang.NoSuchMethodError: 'fs2.Stream fs2.interop.reactivestreams.package$PublisherOps$.toStream$extension(org.reactivestreams.Publisher, cats.effect.kernel.Async)'
	at test.App$.$anonfun$run$1(app.scala:10)
	at fs2.Stream.$anonfun$flatMap$1(Stream.scala:1152)
	at fs2.Pull$FlatMapR$1.go$2(Pull.scala:1000)
	at fs2.Pull$FlatMapR$1.unconsed(Pull.scala:1008)
	at fs2.Pull$FlatMapR$1.out(Pull.scala:1017)
	at fs2.Pull$ViewRunner$1.outLoop$1(Pull.scala:928)
	at fs2.Pull$ViewRunner$1.out(Pull.scala:930)
	at fs2.Pull$.$anonfun$compile$24(Pull.scala:1175)
	at fs2.Pull$.$anonfun$compile$2(Pull.scala:903)
	at cats.effect.IOFiber.next$2(IOFiber.scala:361)
	at cats.effect.IOFiber.runLoop(IOFiber.scala:372)
	at cats.effect.IOFiber.execR(IOFiber.scala:1169)
	at cats.effect.IOFiber.run(IOFiber.scala:128)
	at cats.effect.unsafe.WorkerThread.run(WorkerThread.scala:410)
Exception in thread "main" java.lang.NoSuchMethodError: 'fs2.Stream fs2.interop.reactivestreams.package$PublisherOps$.toStream$extension(org.reactivestreams.Publisher, cats.effect.kernel.Async)'
	at test.App$.$anonfun$run$1(app.scala:10)
	at fs2.Stream.$anonfun$flatMap$1(Stream.scala:1152)
	at fs2.Pull$FlatMapR$1.go$2(Pull.scala:1000)
	at fs2.Pull$FlatMapR$1.unconsed(Pull.scala:1008)
	at fs2.Pull$FlatMapR$1.out(Pull.scala:1017)
	at fs2.Pull$ViewRunner$1.outLoop$1(Pull.scala:928)
	at fs2.Pull$ViewRunner$1.out(Pull.scala:930)
	at fs2.Pull$.$anonfun$compile$24(Pull.scala:1175)
	at fs2.Pull$.$anonfun$compile$2(Pull.scala:903)
	at cats.effect.IOFiber.next$2(IOFiber.scala:361)
	at cats.effect.IOFiber.runLoop(IOFiber.scala:372)
	at cats.effect.IOFiber.execR(IOFiber.scala:1169)
	at cats.effect.IOFiber.run(IOFiber.scala:128)
	at cats.effect.unsafe.WorkerThread.run(WorkerThread.scala:410)

@yanns
Copy link
Contributor Author

yanns commented Oct 7, 2021

@mpilquist Thanks for having checked! I don't really understand why adding a new method with a same name breaks the binary compatibility. Is someone understands, I'd be glad to learn about it.

I've removed the mima exclusion: b6e1847
And I've renamed the new API: 30df37d
mimaReportBinaryIssues is happy about this.

If you have better suggestions, please feel free to comment.

@AL333Z
Copy link
Contributor

AL333Z commented Oct 7, 2021

Definitely not a binary compatibility expert here, but the diffs in the bytecode https://editor.mergely.com/Pq7KTAXv/ are were showing a new bufferSize arg (around line 30 and following).

@yanns
Copy link
Contributor Author

yanns commented Oct 7, 2021

Can someone trigger the workflow?

@yanns
Copy link
Contributor Author

yanns commented Oct 8, 2021

@mpilquist the binary issue is now fixed. Can you review again?

@mpilquist mpilquist merged commit eb39fe4 into typelevel:main Oct 9, 2021
@yanns yanns deleted the buffer_subscriber branch October 9, 2021 12:44
@yanns
Copy link
Contributor Author

yanns commented Feb 11, 2023

I've made a little talk about this PR: https://yanns.github.io/blog/2021/12/21/fs2-mongo-reactivestreams/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants