Skip to content

Commit

Permalink
Merge pull request #3401 from typelevel/topic/conflate
Browse files Browse the repository at this point in the history
Add conflate operations to Stream
  • Loading branch information
mpilquist authored Mar 8, 2024
2 parents 5c965aa + b939a1f commit e0cdf07
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 1 deletion.
43 changes: 42 additions & 1 deletion core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,48 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
Stream.eval(fstream)
}

/** Pulls up to the specified number of chunks from the source stream while concurrently allowing
* downstream to process emitted chunks. Unlike `prefetchN`, all accumulated chunks are emitted
* as a single chunk upon downstream pulling.
*
* The `chunkLimit` parameter controls backpressure on the source stream.
*/
def conflateChunks[F2[x] >: F[x]: Concurrent](chunkLimit: Int): Stream[F2, Chunk[O]] =
Stream.eval(Channel.bounded[F2, Chunk[O]](chunkLimit)).flatMap { chan =>
val producer = chunks.through(chan.sendAll)
val consumer = chan.stream.chunks.map(_.combineAll)
consumer.concurrently(producer)
}

/** Like `conflateChunks` but uses the supplied `zero` and `f` values to combine the elements of
* each output chunk in to a single value.
*/
def conflate[F2[x] >: F[x]: Concurrent, O2](chunkLimit: Int, zero: O2)(
f: (O2, O) => O2
): Stream[F2, O2] =
conflateChunks[F2](chunkLimit).map(_.foldLeft(zero)(f))

/** Like `conflate` but combines elements of the output chunk with the supplied function.
*/
def conflate1[F2[x] >: F[x]: Concurrent, O2 >: O](chunkLimit: Int)(
f: (O2, O2) => O2
): Stream[F2, O2] =
conflateChunks[F2](chunkLimit).map(_.iterator.reduce(f))

/** Like `conflate1` but combines elements using the semigroup of the output type.
*/
def conflateSemigroup[F2[x] >: F[x]: Concurrent, O2 >: O: Semigroup](
chunkLimit: Int
): Stream[F2, O2] =
conflate1[F2, O2](chunkLimit)(Semigroup[O2].combine)

/** Conflates elements and then maps the supplied function over the output chunk and combines the results using a semigroup.
*/
def conflateMap[F2[x] >: F[x]: Concurrent, O2: Semigroup](
chunkLimit: Int
)(f: O => O2): Stream[F2, O2] =
conflateChunks[F2](chunkLimit).map(_.iterator.map(f).reduce(Semigroup[O2].combine))

/** Prepends a chunk onto the front of this stream.
*
* @example {{{
Expand Down Expand Up @@ -2398,7 +2440,6 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
Stream.eval(Channel.bounded[F2, Chunk[O]](n)).flatMap { chan =>
chan.stream.unchunks.concurrently {
chunks.through(chan.sendAll)

}
}

Expand Down
47 changes: 47 additions & 0 deletions core/shared/src/test/scala/fs2/StreamConflateSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2

import cats.effect.IO
import cats.effect.testkit.TestControl

import scala.concurrent.duration._

class StreamConflateSuite extends Fs2Suite {

test("conflateMap") {
TestControl.executeEmbed(
Stream
.iterate(0)(_ + 1)
.covary[IO]
.metered(10.millis)
.conflateMap(100)(List(_))
.metered(101.millis)
.take(5)
.compile
.toList
.assertEquals(
List(0) :: (1 until 10).toList :: 10.until(40).toList.grouped(10).toList
)
)
}
}

0 comments on commit e0cdf07

Please sign in to comment.