Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publisher.toStream can demand for more than one element #2666

Merged
merged 14 commits into from
Oct 9, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,19 @@ final class StreamSubscriber[F[_], A](
sub.onError(t)
}

def stream(subscribe: F[Unit]): Stream[F, A] = sub.stream(subscribe)
def streamChunk(subscribe: F[Unit]): Stream[F, Chunk[A]] = sub.stream(subscribe)
def stream(subscribe: F[Unit]): Stream[F, A] = streamChunk(subscribe).unchunks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Echoing @diesalbla's comment here -- I think we should offer just the stream(...): Stream[F, A] method and let folks manually call .chunks if they want to operate on chunks explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


private def nonNull[B](b: B): Unit = if (b == null) throw new NullPointerException()
}

object StreamSubscriber {

def apply[F[_]: Async, A](bufferSize: Long): F[StreamSubscriber[F, A]] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bufferSize should be an Int here as Chunk is Int based.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fsm[F, A](bufferSize).map(new StreamSubscriber(_))

def apply[F[_]: Async, A]: F[StreamSubscriber[F, A]] =
fsm[F, A].map(new StreamSubscriber(_))
apply(bufferSize = 10L)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the previous behavior asking elements one by one.
It's also a magic number coming from a quick thinking about the trade-offs memory vs performance.
Should we handle that differently? Using 1L to keep the previous behavior? Using some configuration?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the behavior if requesting 10 but there's only 5 available?

This does feel a bit too magical IMO -- I lean towards setting it to 1 or an arbitrary power of 2 :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only 5 are available, then we build a chunk of 5 elements, and it's being send in the OnComplete step: https://github.com/typelevel/fs2/pull/2666/files#diff-493b2533a887032dfbf8ea09dca7201413e124b5f45732ad6a877b8d7f69ba2cR178-R180

By setting it to 1 by default can lead to bad performances by default. I'd suggest to take an arbitrary number, not too high to avoid buffering too many elements in memory. 16?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would strongly recommend not changing existing behavior in source-compatible ways

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep agreed, let's stick with 1 here as that's what every has used for years.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point.
I see 2 possible options:

  • option 1: we set the default buffer size to 1. No surprise for existing users. Maybe surprise for new users who are struggling with bad performances.
  • option 2: we change the API to emit a compilation warning (see Publisher.toStream can demand for more than one element #2666 (comment)). Existing users will have a compilation warning, grabbing their attention on this particular change.
    • 2.a) with compilation warning + keep existing behavior with a buffer size of 1
    • 2.b) with compilation warning + using a buffer size of 16

My 2 cents from my experience as new user: I'd expect FS2 to be fast by default, and not slow by default unless I understand deeply the API and change it.
I'm not the maintainer of this lib. I let you decide.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 versus 10 versus 16 could make a performance difference in certain types of streams but not others, where folks would need 100 or 1000 or more. Hence, I think it's best if we make no assumptions at all. So let's go with deprecating the old API and forcing the buffer size decision on to users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4ad2bbb
to keep the existing behavior + deprecations


@deprecated("Use apply method without dispatcher instead", "3.1.4")
def apply[F[_]: Async, A](dispatcher: Dispatcher[F]): F[StreamSubscriber[F, A]] =
Expand All @@ -102,19 +106,23 @@ object StreamSubscriber {
def onFinalize: F[Unit]

/** producer for downstream */
def dequeue1: F[Either[Throwable, Option[A]]]
def dequeue1: F[Either[Throwable, Option[Chunk[A]]]]

/** downstream stream */
def stream(subscribe: F[Unit])(implicit ev: ApplicativeError[F, Throwable]): Stream[F, A] =
def stream(
subscribe: F[Unit]
)(implicit ev: ApplicativeError[F, Throwable]): Stream[F, Chunk[A]] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change return here to Stream[F, A]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream.bracket(subscribe)(_ => onFinalize) >> Stream
.eval(dequeue1)
.repeat
.rethrow
.unNoneTerminate
}

private[reactivestreams] def fsm[F[_], A](implicit F: Async[F]): F[FSM[F, A]] = {
type Out = Either[Throwable, Option[A]]
private[reactivestreams] def fsm[F[_], A](
bufferSize: Long
)(implicit F: Async[F]): F[FSM[F, A]] = {
type Out = Either[Throwable, Option[Chunk[A]]]

sealed trait Input
case class OnSubscribe(s: Subscription) extends Input
Expand All @@ -126,9 +134,10 @@ object StreamSubscriber {

sealed trait State
case object Uninitialized extends State
case class Idle(sub: Subscription) extends State
case class Idle(sub: Subscription, buffer: Vector[A]) extends State
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change Vector[A] to Chunk[A] and use ++ to build up the buffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case class RequestBeforeSubscription(req: Out => Unit) extends State
case class WaitingOnUpstream(sub: Subscription, elementRequest: Out => Unit) extends State
case class WaitingOnUpstream(sub: Subscription, buffer: Vector[A], elementRequest: Out => Unit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, change buffer to Chunk[A]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extends State
case object UpstreamCompletion extends State
case object DownstreamCancellation extends State
case class UpstreamError(err: Throwable) extends State
Expand All @@ -142,8 +151,9 @@ object StreamSubscriber {
def step(in: Input): State => (State, () => Unit) =
in match {
case OnSubscribe(s) => {
case RequestBeforeSubscription(req) => WaitingOnUpstream(s, req) -> (() => s.request(1))
case Uninitialized => Idle(s) -> (() => ())
case RequestBeforeSubscription(req) =>
WaitingOnUpstream(s, Vector.empty, req) -> (() => s.request(bufferSize))
case Uninitialized => Idle(s, Vector.empty) -> (() => ())
case o =>
val err = new Error(s"received subscription in invalid state [$o]")
o -> { () =>
Expand All @@ -152,31 +162,44 @@ object StreamSubscriber {
}
}
case OnNext(a) => {
case WaitingOnUpstream(s, r) => Idle(s) -> (() => r(a.some.asRight))
case DownstreamCancellation => DownstreamCancellation -> (() => ())
case WaitingOnUpstream(s, buffer, r) =>
val newBuffer = buffer :+ a
if (newBuffer.size == bufferSize) {
val chunk = Chunk.vector(newBuffer)
Idle(s, Vector.empty) -> (() => r(chunk.some.asRight))
} else
WaitingOnUpstream(s, newBuffer, r) -> (() => ())
case DownstreamCancellation => DownstreamCancellation -> (() => ())
case o =>
o -> (() => reportFailure(new Error(s"received record [$a] in invalid state [$o]")))
}
case OnComplete => {
case WaitingOnUpstream(_, r) => UpstreamCompletion -> (() => r(None.asRight))
case _ => UpstreamCompletion -> (() => ())
case WaitingOnUpstream(_, buffer, r) =>
if (buffer.nonEmpty) {
val chunk = Chunk.vector(buffer)
UpstreamCompletion -> (() => r(chunk.some.asRight))
} else {
UpstreamCompletion -> (() => r(None.asRight))
}
case _ => UpstreamCompletion -> (() => ())
}
case OnError(e) => {
case WaitingOnUpstream(_, r) => UpstreamError(e) -> (() => r(e.asLeft))
case _ => UpstreamError(e) -> (() => ())
case WaitingOnUpstream(_, _, r) => UpstreamError(e) -> (() => r(e.asLeft))
case _ => UpstreamError(e) -> (() => ())
}
case OnFinalize => {
case WaitingOnUpstream(sub, r) =>
case WaitingOnUpstream(sub, _, r) =>
DownstreamCancellation -> { () =>
sub.cancel()
r(None.asRight)
}
case Idle(sub) => DownstreamCancellation -> (() => sub.cancel())
case o => o -> (() => ())
case Idle(sub, _) => DownstreamCancellation -> (() => sub.cancel())
case o => o -> (() => ())
}
case OnDequeue(r) => {
case Uninitialized => RequestBeforeSubscription(r) -> (() => ())
case Idle(sub) => WaitingOnUpstream(sub, r) -> (() => sub.request(1))
case Uninitialized => RequestBeforeSubscription(r) -> (() => ())
case Idle(sub, buffer) =>
WaitingOnUpstream(sub, buffer, r) -> (() => sub.request(bufferSize))
case err @ UpstreamError(e) => err -> (() => r(e.asLeft))
case UpstreamCompletion => UpstreamCompletion -> (() => r(None.asRight))
case o => o -> (() => r(new Error(s"received request in invalid state [$o]").asLeft))
Expand All @@ -196,8 +219,8 @@ object StreamSubscriber {
def onError(t: Throwable): Unit = nextState(OnError(t))
def onComplete(): Unit = nextState(OnComplete)
def onFinalize: F[Unit] = F.delay(nextState(OnFinalize))
def dequeue1: F[Either[Throwable, Option[A]]] =
F.async_[Either[Throwable, Option[A]]] { cb =>
def dequeue1: F[Either[Throwable, Option[Chunk[A]]]] =
F.async_[Either[Throwable, Option[Chunk[A]]]] { cb =>
nextState(OnDequeue(out => cb(Right(out))))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,30 @@ import org.reactivestreams._
*/
package object reactivestreams {

/** Creates a lazy stream from an `org.reactivestreams.Publisher`.
*
* The publisher only receives a subscriber when the stream is run.
*/
def fromPublisher[F[_]: Async, A](p: Publisher[A], bufferSize: Long): Stream[F, Chunk[A]] =
Stream
.eval(StreamSubscriber[F, A](bufferSize))
.flatMap(s => s.sub.stream(Sync[F].delay(p.subscribe(s))))

/** Creates a lazy stream from an `org.reactivestreams.Publisher`.
*
* The publisher only receives a subscriber when the stream is run.
*/
def fromPublisher[F[_]: Async, A](p: Publisher[A]): Stream[F, A] =
Stream
.eval(StreamSubscriber[F, A])
.flatMap(s => s.sub.stream(Sync[F].delay(p.subscribe(s))))
.flatMap(s => s.sub.stream(Sync[F].delay(p.subscribe(s))).unchunks)

implicit final class PublisherOps[A](val publisher: Publisher[A]) extends AnyVal {

/** Creates a lazy stream from an `org.reactivestreams.Publisher` */
def toStreamChunk[F[_]: Async](bufferSize: Long): Stream[F, Chunk[A]] =
fromPublisher(publisher, bufferSize)

/** Creates a lazy stream from an `org.reactivestreams.Publisher` */
def toStream[F[_]: Async]: Stream[F, A] =
fromPublisher(publisher)
Expand Down