Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change scoping behavior to open a scope on each bracket #1574

Merged
merged 4 commits into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 2 additions & 37 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,37 +43,9 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing
*/
def stream(implicit ev: R <:< Unit): Stream[F, O] = {
val _ = ev
Stream.fromFreeC(this.scope.get[F, O, Unit])
Stream.fromFreeC(this.asInstanceOf[Pull[F, O, Unit]].get)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note converting a Pull to a Stream no longer inserts a scope -- this is an optional part of this PR but this always felt super arbitrary to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

}

// For binary compatibility with 1.0.x
private[Pull] def stream: Stream[F, O] =
Stream.fromFreeC(this.scope.get[F, O, Unit])

/**
* Like [[stream]] but no scope is inserted around the pull, resulting in any resources being
* promoted to the current scope of the stream, extending the resource lifetime. Typically used
* as a performance optimization, where resource lifetime can be extended in exchange for faster
* execution.
*
* Caution: this can result in resources with greatly extended lifecycles if the pull
* discards parts of the stream from which it was created. This could lead to memory leaks
* so be very careful when using this function. For example, a pull that emits the first element
* and discards the tail could discard the release of one or more resources that were acquired
* in order to produce the first element. Normally, these resources would be registered in the
* scope generated by the pull-to-stream conversion and hence released as part of that scope
* closing but when using `streamNoScope`, they get promoted to the current stream scope,
* which may be infinite in the worst case.
*/
def streamNoScope(implicit ev: R <:< Unit): Stream[F, O] = {
val _ = ev
Stream.fromFreeC(this.asInstanceOf[Pull[F, O, Unit]].get[F, O, Unit])
}

// For binary compatibility with 1.0.x
private[Pull] def streamNoScope: Stream[F, O] =
Stream.fromFreeC(this.asInstanceOf[Pull[F, O, Unit]].get[F, O, Unit])

/** Applies the resource of this pull to `f` and returns the result. */
def flatMap[F2[x] >: F[x], O2 >: O, R2](f: R => Pull[F2, O2, R2]): Pull[F2, O2, R2] =
Pull.fromFreeC(get[F2, O2, R].flatMap(r => f(r).get))
Expand Down Expand Up @@ -109,9 +81,6 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing
h: Throwable => Pull[F2, O2, R2]): Pull[F2, O2, R2] =
Pull.fromFreeC(get[F2, O2, R2].handleErrorWith(e => h(e).get))

/** Tracks any resources acquired during this pull and releases them when the pull completes. */
def scope: Pull[F, O, Unit] = Pull.fromFreeC(Algebra.scope(get[F, O, R].map(_ => ())))

/** Discards the result type of this pull. */
def void: Pull[F, O, Unit] = as(())
}
Expand Down Expand Up @@ -143,18 +112,14 @@ object Pull extends PullLowPriority {
}

/**
* Acquire a resource within a `Pull`. The cleanup action will be run at the end
* of the `.stream` scope which executes the returned `Pull`. The acquired
* resource is returned as the result value of the pull.
* Acquire a resource within a `Pull`. The acquired resource is returned as the result value of the pull.
*/
def acquire[F[_]: RaiseThrowable, R](r: F[R])(cleanup: R => F[Unit]): Pull[F, INothing, R] =
acquireCancellable(r)(cleanup).map(_.resource)

/**
* Like [[acquire]] but the result value consists of a cancellation
* pull and the acquired resource. Running the cancellation pull frees the resource.
* This allows the acquired resource to be released earlier than at the end of the
* containing pull scope.
*/
def acquireCancellable[F[_]: RaiseThrowable, R](r: F[R])(
Copy link
Member Author

@mpilquist mpilquist Sep 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be deleted. I'll open a follow-up PR after this one is merged. IIRC, I added it when writing a custom pull that did log rotation -- the file handles were all opened in the same scope as a result of the custom pull. At the time, I couldn't find a way to close the old file handle when opening a new one. This was back in 0.9 with a significantly different pull API though.

cleanup: R => F[Unit]): Pull[F, INothing, Cancellable[F, R]] =
Expand Down
60 changes: 26 additions & 34 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,21 +137,22 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* Appends `s2` to the end of this stream.
*
* @example {{{
* scala> ( Stream(1,2,3)++Stream(4,5,6) ).toList
* scala> (Stream(1,2,3) ++ Stream(4,5,6)).toList
* res0: List[Int] = List(1, 2, 3, 4, 5, 6)
* }}}
*
* If `this` stream is not terminating, then the result is equivalent to `this`.
* If `this` stream is infinite, then the result is equivalent to `this`.
*/
def ++[F2[x] >: F[x], O2 >: O](s2: => Stream[F2, O2]): Stream[F2, O2] = append(s2)

/** Appends `s2` to the end of this stream. Alias for `s1 ++ s2`. */
def append[F2[x] >: F[x], O2 >: O](s2: => Stream[F2, O2]): Stream[F2, O2] =
def ++[F2[x] >: F[x], O2 >: O](s2: => Stream[F2, O2]): Stream[F2, O2] =
Stream.fromFreeC(get[F2, O2].transformWith {
case Result.Pure(_) => s2.get
case other => other.asFreeC[Algebra[F2, O2, ?]]
})

/** Appends `s2` to the end of this stream. Alias for `s1 ++ s2`. */
def append[F2[x] >: F[x], O2 >: O](s2: => Stream[F2, O2]): Stream[F2, O2] =
this ++ s2

/**
* Alias for `_.map(_ => o2)`.
*
Expand Down Expand Up @@ -1361,7 +1362,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* }}}
*/
def handleErrorWith[F2[x] >: F[x], O2 >: O](h: Throwable => Stream[F2, O2]): Stream[F2, O2] =
Stream.fromFreeC(Algebra.scope(get[F2, O2]).handleErrorWith(e => h(e).get[F2, O2]))
Stream.fromFreeC(get[F2, O2].handleErrorWith(e => h(e).get[F2, O2]))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for a scope insertion here anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool. I remember this was done on the interruption, when we signalled it with the Throwable. If all interruption test go green this is ok.


/**
* Emits the first element of this stream (if non-empty) and then halts.
Expand Down Expand Up @@ -1603,7 +1604,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* }}}
*/
def map[O2](f: O => O2): Stream[F, O2] =
this.pull.echo.mapOutput(f).streamNoScope
this.pull.echo.mapOutput(f).stream

/**
* Maps a running total according to `S` and the input with the function `f`.
Expand Down Expand Up @@ -1751,8 +1752,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
s.chunks
.evalMap { chunk =>
guard.acquire >>
resultQ.enqueue1(
Some(Stream.chunk(chunk).onFinalize(guard.release).scope))
resultQ.enqueue1(Some(Stream.chunk(chunk).onFinalize(guard.release)))
}
.interruptWhen(interrupt.get.attempt)
.compile
Expand Down Expand Up @@ -2044,8 +2044,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
.dropWhile(_ > 0)
.take(1)
.compile
.drain >> signalResult)
.scope >>
.drain >> signalResult) >>
outputQ.dequeue
.flatMap(Stream.chunk(_).covary[F2])

Expand Down Expand Up @@ -2329,17 +2328,8 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
def scanMonoid[O2 >: O](implicit O: Monoid[O2]): Stream[F, O2] =
scan(O.empty)(O.combine)

/**
* Scopes are typically inserted automatically, at the boundary of a pull (i.e., when a pull
* is converted to a stream). This method allows a scope to be explicitly demarcated so that
* resources can be freed earlier than when using automatically inserted scopes. This is
* useful when using `streamNoScope` to convert from `Pull` to `Stream` -- i.e., by choosing
* to *not* have scopes inserted automatically, you may end up needing to demarcate scopes
* manually at a higher level in the stream structure.
*
* Note: see the disclaimer about the use of `streamNoScope`.
*/
def scope: Stream[F, O] = Stream.fromFreeC(Algebra.scope(get))
private def scope: Stream[F, O] =
Stream.fromFreeC(Algebra.scope(get))

/**
* Writes this stream to the supplied `PrintStream`, converting each element to a `String` via `Show`.
Expand Down Expand Up @@ -2937,7 +2927,7 @@ object Stream extends StreamLowPriority {
release: (R, ExitCase[Throwable]) => F[Unit]): Stream[F, R] =
fromFreeC(Algebra.acquire[F, R, R](acquire, release).flatMap {
case (r, token) => Stream.emit(r).covary[F].get[F, R]
})
}).scope

/**
* Like [[bracket]] but the result value consists of a cancellation
Expand All @@ -2959,13 +2949,14 @@ object Stream extends StreamLowPriority {
*/
def bracketCaseCancellable[F[x] >: Pure[x], R](acquire: F[R])(
release: (R, ExitCase[Throwable]) => F[Unit]): Stream[F, (Stream[F, Unit], R)] =
bracketWithResource(acquire)(release).map {
case (res, r) =>
(Stream.eval(res.release(ExitCase.Canceled)).flatMap {
case Left(t) => Stream.fromFreeC(Algebra.raiseError[F, Unit](t))
case Right(u) => Stream.emit(u)
}, r)
}
bracketWithResource(acquire)(release)
.map {
case (res, r) =>
(Stream.eval(res.release(ExitCase.Canceled)).flatMap {
case Left(t) => Stream.fromFreeC(Algebra.raiseError[F, Unit](t))
case Right(u) => Stream.emit(u)
}, r)
}

private[fs2] def bracketWithResource[F[x] >: Pure[x], R](acquire: F[R])(
release: (R, ExitCase[Throwable]) => F[Unit]): Stream[F, (fs2.internal.Resource[F], R)] =
Expand All @@ -2976,7 +2967,7 @@ object Stream extends StreamLowPriority {
.covary[F]
.map(o => (res, o))
.get[F, (fs2.internal.Resource[F], R)]
})
}).scope

/**
* Creates a pure stream that emits the elements of the supplied chunk.
Expand Down Expand Up @@ -3966,7 +3957,7 @@ object Stream extends StreamLowPriority {

resourceEval {
F.delay(init())
.flatMap(i => Algebra.compile(s.get, scope, i)(foldChunk))
.flatMap(i => Algebra.compile(s.get, scope, true, i)(foldChunk))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using s.compile.resource, we enable suppression of soft scopes (aka scopes that are a result of resource acquisition) for the root scope.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats an ultra-magic statement :-) I think we would need to clarify it for users.

.map(finalize)
}
}
Expand All @@ -3978,7 +3969,8 @@ object Stream extends StreamLowPriority {
private def compile[F[_], O, B](stream: FreeC[Algebra[F, O, ?], Unit], init: B)(
f: (B, Chunk[O]) => B)(implicit F: Sync[F]): F[B] =
F.bracketCase(CompileScope.newRoot[F])(scope =>
Algebra.compile[F, O, B](stream, scope, init)(f))((scope, ec) => scope.close(ec).rethrow)
Algebra.compile[F, O, B](stream, scope, false, init)(f))((scope, ec) =>
scope.close(ec).rethrow)

implicit def syncInstance[F[_]](implicit F: Sync[F]): Compiler[F, F] = new Compiler[F, F] {
def apply[O, B, C](s: Stream[F, O], init: () => B)(foldChunk: (B, Chunk[O]) => B,
Expand Down
4 changes: 2 additions & 2 deletions core/shared/src/main/scala/fs2/concurrent/Balance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ object Balance {
.getStream(chunkSize)
.unNoneTerminate
.flatMap(Stream.chunk)

def push =
source.chunks
.evalMap(chunk => pubSub.publish(Some(chunk)))
Expand Down Expand Up @@ -84,7 +83,8 @@ object Balance {
def through[F[_]: Concurrent, O, O2](chunkSize: Int)(pipes: Pipe[F, O, O2]*): Pipe[F, O, O2] =
_.balance(chunkSize)
.take(pipes.size)
.zipWith(Stream.emits(pipes)) { case (stream, pipe) => stream.through(pipe) }
.zipWithIndex
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had to change as it makes an assumption of scopes involved when zipping. With the more granular scopes used here, the leasing of the step legs was not effective.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Also we do have some tests that were covering interleaving scopes. I wonder what these tests will do

.map { case (stream, idx) => stream.through(pipes(idx.toInt)) }
.parJoinUnbounded

private def strategy[O]: PubSub.Strategy[Chunk[O], Chunk[O], Option[Chunk[O]], Int] =
Expand Down
3 changes: 2 additions & 1 deletion core/shared/src/main/scala/fs2/concurrent/Broadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ object Broadcast {
def through[F[_]: Concurrent, O, O2](pipes: Pipe[F, O, O2]*): Pipe[F, O, O2] =
_.through(apply(pipes.size))
.take(pipes.size)
.zipWith(Stream.emits(pipes)) { case (src, pipe) => src.through(pipe) }
.zipWithIndex
.map { case (src, idx) => src.through(pipes(idx.toInt)) }
.parJoinUnbounded

/**
Expand Down
Loading