diff --git a/benchmark/src/main/scala/fs2/benchmark/FreeCBenchmark.scala b/benchmark/src/main/scala/fs2/benchmark/FreeCBenchmark.scala index e34f95e4e4..b9baf6fa1d 100644 --- a/benchmark/src/main/scala/fs2/benchmark/FreeCBenchmark.scala +++ b/benchmark/src/main/scala/fs2/benchmark/FreeCBenchmark.scala @@ -36,7 +36,7 @@ class FreeCBenchmark { self.viewL match { case Result.Pure(r) => F.pure(Some(r)) case Result.Fail(e) => F.raiseError(e) - case Result.Interrupted(_, err) => err.fold[F[Option[R]]](F.pure(None)) { F.raiseError } + case Result.Interrupted(_, err) => err.fold[F[Option[R]]](F.pure(None))(F.raiseError) case _ @ViewL.View(_) => F.raiseError(new RuntimeException("Never get here)")) } } diff --git a/build.sbt b/build.sbt index b967a7fb74..cc614d5167 100644 --- a/build.sbt +++ b/build.sbt @@ -123,7 +123,7 @@ lazy val scaladocSettings = Seq( "-implicits-sound-shadowing", "-implicits-show-all" ), - scalacOptions in (Compile, doc) ~= { _.filterNot { _ == "-Xfatal-warnings" } }, + scalacOptions in (Compile, doc) ~= { _.filterNot(_ == "-Xfatal-warnings") }, autoAPIMappings := true ) @@ -140,16 +140,16 @@ lazy val publishingSettings = Seq( password <- Option(System.getenv().get("SONATYPE_PASSWORD")) } yield Credentials("Sonatype Nexus Repository Manager", "oss.sonatype.org", username, password)).toSeq, publishMavenStyle := true, - pomIncludeRepository := { _ => - false - }, + pomIncludeRepository := { _ => false }, pomExtra := { - {for ((username, name) <- contributors) yield + { + for ((username, name) <- contributors) yield {username} {name} http://github.com/{username} - } + + } }, pomPostProcess := { node => @@ -159,9 +159,7 @@ lazy val publishingSettings = Seq( override def transform(n: Node) = if (f(n)) NodeSeq.Empty else n } - val stripTestScope = stripIf { n => - n.label == "dependency" && (n \ "scope").text == "test" - } + val stripTestScope = stripIf(n => n.label == "dependency" && (n \ "scope").text == "test") new RuleTransformer(stripTestScope).transform(node)(0) }, gpgWarnOnFailure := Option(System.getenv().get("GPG_WARN_ON_FAILURE")).isDefined diff --git a/core/jvm/src/main/scala/fs2/compress.scala b/core/jvm/src/main/scala/fs2/compress.scala index 7a2985000a..92744db7ca 100644 --- a/core/jvm/src/main/scala/fs2/compress.scala +++ b/core/jvm/src/main/scala/fs2/compress.scala @@ -208,7 +208,8 @@ object compress { def push(chunk: Chunk[Byte]): Unit = { val arr: Array[Byte] = { val buf = new Array[Byte](chunk.size) - chunk.copyToArray(buf) // Note: we can be slightly better than this for Chunk.Bytes if we track incoming offsets in abis + // Note: we can be slightly better than this for Chunk.Bytes if we track incoming offsets in abis + chunk.copyToArray(buf) buf } val pushed = abis.push(arr) diff --git a/core/jvm/src/main/scala/fs2/hash.scala b/core/jvm/src/main/scala/fs2/hash.scala index 39d45ca102..db8efcbe48 100644 --- a/core/jvm/src/main/scala/fs2/hash.scala +++ b/core/jvm/src/main/scala/fs2/hash.scala @@ -40,8 +40,6 @@ object hash { d.update(bytes.values, bytes.offset, bytes.size) d } - .flatMap { d => - Stream.chunk(Chunk.bytes(d.digest())) - } + .flatMap(d => Stream.chunk(Chunk.bytes(d.digest()))) } } diff --git a/core/jvm/src/test/scala/fs2/HashSpec.scala b/core/jvm/src/test/scala/fs2/HashSpec.scala index 761e58bc83..f1110d61ae 100644 --- a/core/jvm/src/test/scala/fs2/HashSpec.scala +++ b/core/jvm/src/test/scala/fs2/HashSpec.scala @@ -19,31 +19,20 @@ class HashSpec extends Fs2Spec { else str.getBytes .grouped(n) - .foldLeft(Stream.empty.covaryOutput[Byte])((acc, c) => acc ++ Stream.chunk(Chunk.bytes(c)) + .foldLeft(Stream.empty.covaryOutput[Byte])((acc, c) => + acc ++ Stream.chunk(Chunk.bytes(c)) ) assert(s.through(h).toList == digest(algo, str)) } "digests" - { - "md2" in forAll { (s: String) => - checkDigest(md2, "MD2", s) - } - "md5" in forAll { (s: String) => - checkDigest(md5, "MD5", s) - } - "sha1" in forAll { (s: String) => - checkDigest(sha1, "SHA-1", s) - } - "sha256" in forAll { (s: String) => - checkDigest(sha256, "SHA-256", s) - } - "sha384" in forAll { (s: String) => - checkDigest(sha384, "SHA-384", s) - } - "sha512" in forAll { (s: String) => - checkDigest(sha512, "SHA-512", s) - } + "md2" in forAll((s: String) => checkDigest(md2, "MD2", s)) + "md5" in forAll((s: String) => checkDigest(md5, "MD5", s)) + "sha1" in forAll((s: String) => checkDigest(sha1, "SHA-1", s)) + "sha256" in forAll((s: String) => checkDigest(sha256, "SHA-256", s)) + "sha384" in forAll((s: String) => checkDigest(sha384, "SHA-384", s)) + "sha512" in forAll((s: String) => checkDigest(sha512, "SHA-512", s)) } "empty input" in { diff --git a/core/jvm/src/test/scala/fs2/MemorySanityChecks.scala b/core/jvm/src/test/scala/fs2/MemorySanityChecks.scala index d5c7f7a001..8fe7754898 100644 --- a/core/jvm/src/test/scala/fs2/MemorySanityChecks.scala +++ b/core/jvm/src/test/scala/fs2/MemorySanityChecks.scala @@ -97,11 +97,7 @@ object DanglingDequeueSanityTest extends App { implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) Stream .eval(Queue.unbounded[IO, Int]) - .flatMap { q => - Stream.constant(1).flatMap { _ => - Stream.empty.mergeHaltBoth(q.dequeue) - } - } + .flatMap(q => Stream.constant(1).flatMap(_ => Stream.empty.mergeHaltBoth(q.dequeue))) .compile .drain .unsafeRunSync @@ -113,9 +109,7 @@ object AwakeEverySanityTest extends App { implicit val timer: Timer[IO] = IO.timer(ExecutionContext.Implicits.global) Stream .awakeEvery[IO](1.millis) - .flatMap { _ => - Stream.eval(IO(())) - } + .flatMap(_ => Stream.eval(IO(()))) .compile .drain .unsafeRunSync @@ -125,9 +119,7 @@ object SignalDiscreteSanityTest extends App { implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) Stream .eval(SignallingRef[IO, Unit](())) - .flatMap { signal => - signal.discrete.evalMap(a => signal.set(a)) - } + .flatMap(signal => signal.discrete.evalMap(a => signal.set(a))) .compile .drain .unsafeRunSync @@ -137,9 +129,7 @@ object SignalContinuousSanityTest extends App { implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) Stream .eval(SignallingRef[IO, Unit](())) - .flatMap { signal => - signal.continuous.evalMap(a => signal.set(a)) - } + .flatMap(signal => signal.continuous.evalMap(a => signal.set(a))) .compile .drain .unsafeRunSync diff --git a/core/shared/src/main/scala/fs2/Hotswap.scala b/core/shared/src/main/scala/fs2/Hotswap.scala index c32d4e8fcb..4dd51eea27 100644 --- a/core/shared/src/main/scala/fs2/Hotswap.scala +++ b/core/shared/src/main/scala/fs2/Hotswap.scala @@ -107,8 +107,8 @@ object Hotswap { Resource.make(initialize)(finalize).map { state => new Hotswap[F, R] { override def swap(next: Resource[F, R]): F[R] = - (next <* ().pure[Resource[F, ?]]) // workaround for https://github.com/typelevel/cats-effect/issues/579 - .allocated + // workaround for https://github.com/typelevel/cats-effect/issues/579 + (next <* ().pure[Resource[F, ?]]).allocated .continual { r => // this whole block is inside continual and cannot be canceled Sync[F].fromEither(r).flatMap { case (newValue, newFinalizer) => diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index 100427eb46..a28000a0e2 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -112,7 +112,7 @@ object Pull extends PullLowPriority { * Halts when a step terminates with `None` or `Pull.raiseError`. */ def loop[F[_], O, R](using: R => Pull[F, O, Option[R]]): R => Pull[F, O, Option[R]] = - r => using(r).flatMap { _.map(loop(using)).getOrElse(Pull.pure(None)) } + r => using(r).flatMap(_.map(loop(using)).getOrElse(Pull.pure(None))) /** Outputs a single value. */ def output1[F[x] >: Pure[x], O](o: O): Pull[F, O, Unit] = diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index a55c910a79..60bc20499f 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1010,16 +1010,12 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) def go(z: O2, s: Stream[F2, O]): Pull[F2, O2, Unit] = s.pull.uncons1.flatMap { case Some((hd, tl)) => - Pull.eval(f(z, hd)).flatMap { o => - Pull.output1(o) >> go(o, tl) - } + Pull.eval(f(z, hd)).flatMap(o => Pull.output1(o) >> go(o, tl)) case None => Pull.done } this.pull.uncons1.flatMap { case Some((hd, tl)) => - Pull.eval(f(z, hd)).flatMap { o => - Pull.output(Chunk.seq(List(z, o))) >> go(o, tl) - } + Pull.eval(f(z, hd)).flatMap(o => Pull.output(Chunk.seq(List(z, o))) >> go(o, tl)) case None => Pull.output1(z) }.stream } @@ -1466,9 +1462,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) // this is the same if in the resize function, // short circuited to avoid needlessly converting newAcc.toChunk if (newAcc.size < n) { - Stream.empty ++ startTimeout.flatMap { newTimeout => - go(newAcc, newTimeout) - } + Stream.empty ++ startTimeout.flatMap(newTimeout => go(newAcc, newTimeout)) } else { val (toEmit, rest) = resize(newAcc.toChunk, Stream.empty) toEmit ++ startTimeout.flatMap { newTimeout => @@ -1481,9 +1475,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) } startTimeout - .flatMap { t => - go(Chunk.Queue.empty, t).concurrently(producer) - } + .flatMap(t => go(Chunk.Queue.empty, t).concurrently(producer)) .onFinalize { currentTimeout.modify { case (cancelInFlightTimeout, _) => @@ -1539,9 +1531,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) )(implicit F: Concurrent[F2]): Resource[F2, Signal[F2, O2]] = Stream .eval(SignallingRef[F2, O2](initial)) - .flatMap { sig => - Stream(sig).concurrently(evalMap(sig.set)) - } + .flatMap(sig => Stream(sig).concurrently(evalMap(sig.set))) .compile .resource .lastOrError @@ -1619,7 +1609,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) Stream.bracket(F2.start(runR))(_ => interruptR.complete(()) >> - doneR.get.flatMap { F2.fromEither } + doneR.get.flatMap(F2.fromEither) ) >> this.interruptWhen(interruptL.get.attempt) } } @@ -1643,9 +1633,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) )(implicit F2: Concurrent[F2]): Stream[F2, O] = Stream .getScope[F2] - .flatMap { scope => - Stream.supervise(haltOnSignal.flatMap(scope.interrupt)) >> this - } + .flatMap(scope => Stream.supervise(haltOnSignal.flatMap(scope.interrupt)) >> this) .interruptScope /** @@ -1914,9 +1902,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) interrupt.complete(()).attempt.void // we need to attempt interruption in case the interrupt was already completed. else otherSideDone - .modify { prev => - (true, prev) - } + .modify(prev => (true, prev)) .flatMap { otherDone => if (otherDone) resultQ @@ -2170,7 +2156,9 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) F2.start { inner.chunks .evalMap(s => outputQ.enqueue1(Some(s))) - .interruptWhen(done.map(_.nonEmpty)) // must be AFTER enqueue to the sync queue, otherwise the process may hang to enqueue last item while being interrupted + .interruptWhen( + done.map(_.nonEmpty) + ) // must be AFTER enqueue to the sync queue, otherwise the process may hang to enqueue last item while being interrupted .compile .drain .attempt @@ -2197,9 +2185,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) def runOuter: F2[Unit] = outer .flatMap { inner => - Stream.getScope[F2].evalMap { outerScope => - runInner(inner, outerScope) - } + Stream.getScope[F2].evalMap(outerScope => runInner(inner, outerScope)) } .interruptWhen(done.map(_.nonEmpty)) .compile @@ -4059,7 +4045,7 @@ object Stream extends StreamLowPriority { /** Like `[[unconsN]]`, but leaves the buffered input unconsumed. */ def fetchN(n: Int): Pull[F, INothing, Option[Stream[F, O]]] = - unconsN(n).map { _.map { case (hd, tl) => tl.cons(hd) } } + unconsN(n).map(_.map { case (hd, tl) => tl.cons(hd) }) /** Awaits the next available element where the predicate returns true. */ def find(f: O => Boolean): Pull[F, INothing, Option[(O, Stream[F, O])]] = @@ -4640,9 +4626,9 @@ object Stream extends StreamLowPriority { */ def stream: Stream[F, O] = Pull - .loop[F, O, StepLeg[F, O]] { leg => - Pull.output(leg.head).flatMap(_ => leg.stepLeg) - }(self.setHead(Chunk.empty)) + .loop[F, O, StepLeg[F, O]](leg => Pull.output(leg.head).flatMap(_ => leg.stepLeg))( + self.setHead(Chunk.empty) + ) .void .stream diff --git a/core/shared/src/main/scala/fs2/concurrent/Balance.scala b/core/shared/src/main/scala/fs2/concurrent/Balance.scala index 2ddbe7073b..75d75b0c3c 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Balance.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Balance.scala @@ -90,7 +90,8 @@ object Balance { private def strategy[O]: PubSub.Strategy[Chunk[O], Chunk[O], Option[Chunk[O]], Int] = new PubSub.Strategy[Chunk[O], Chunk[O], Option[Chunk[O]], Int] { def initial: Option[Chunk[O]] = - Some(Chunk.empty) // causes to block first push, hence all the other chunks must be non-empty. + // causes to block first push, hence all the other chunks must be non-empty. + Some(Chunk.empty) def accepts(i: Chunk[O], state: Option[Chunk[O]]): Boolean = state.isEmpty diff --git a/core/shared/src/main/scala/fs2/concurrent/Broadcast.scala b/core/shared/src/main/scala/fs2/concurrent/Broadcast.scala index 917f3a0af1..17528050e6 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Broadcast.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Broadcast.scala @@ -98,11 +98,12 @@ object Broadcast { def awaitSub = false def isEmpty = true } - case class Processing[O]( subscribers: Set[Token], - processing: Set[Token], // added when we enter to Processing state, and removed whenever sub takes current `O` - remains: Set[Token], // removed when subscriber requests another `O` but already seen `current` + // added when we enter to Processing state, and removed whenever sub takes current `O` + processing: Set[Token], + // removed when subscriber requests another `O` but already seen `current` + remains: Set[Token], current: O ) extends State[O] { def awaitSub = false diff --git a/core/shared/src/main/scala/fs2/concurrent/PubSub.scala b/core/shared/src/main/scala/fs2/concurrent/PubSub.scala index ca440e5a87..99462da6f9 100644 --- a/core/shared/src/main/scala/fs2/concurrent/PubSub.scala +++ b/core/shared/src/main/scala/fs2/concurrent/PubSub.scala @@ -210,15 +210,11 @@ private[fs2] object PubSub { def clearPublisher(token: Token)(exitCase: ExitCase[Throwable]): F[Unit] = exitCase match { case ExitCase.Completed => Applicative[F].unit case ExitCase.Error(_) | ExitCase.Canceled => - state.update { ps => - ps.copy(publishers = ps.publishers.filterNot(_.token == token)) - } + state.update(ps => ps.copy(publishers = ps.publishers.filterNot(_.token == token))) } def clearSubscriber(token: Token): F[Unit] = - state.update { ps => - ps.copy(subscribers = ps.subscribers.filterNot(_.token == token)) - } + state.update(ps => ps.copy(subscribers = ps.subscribers.filterNot(_.token == token))) def clearSubscriberOnCancel(token: Token)(exitCase: ExitCase[Throwable]): F[Unit] = exitCase match { @@ -456,11 +452,7 @@ private[fs2] object PubSub { Some(strategy.initial) def accepts(i: Option[I], state: Option[S]): Boolean = - i.forall { el => - state.exists { s => - strategy.accepts(el, s) - } - } + i.forall(el => state.exists(s => strategy.accepts(el, s))) def publish(i: Option[I], state: Option[S]): Option[S] = i match { @@ -487,9 +479,7 @@ private[fs2] object PubSub { (Some(s1), success) } def unsubscribe(selector: Sel, state: Option[S]): Option[S] = - state.map { s => - strategy.unsubscribe(selector, s) - } + state.map(s => strategy.unsubscribe(selector, s)) } /** diff --git a/core/shared/src/main/scala/fs2/concurrent/Queue.scala b/core/shared/src/main/scala/fs2/concurrent/Queue.scala index 5b06d79bd1..ac8e125022 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Queue.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Queue.scala @@ -213,9 +213,7 @@ object Queue { pubSub.getStream(maxSize).flatMap(Stream.chunk) def dequeueBatch: Pipe[F, Int, A] = - _.flatMap { sz => - Stream.evalUnChunk(pubSub.get(sz)) - } + _.flatMap(sz => Stream.evalUnChunk(pubSub.get(sz))) } } } @@ -521,7 +519,7 @@ object InspectableQueue { } def peek1: F[A] = - Sync[F].bracket(Sync[F].delay(new Token))({ token => + Sync[F].bracket(Sync[F].delay(new Token)) { token => def take: F[A] = pubSub.get(Left(Some(token))).flatMap { case Left(s) => @@ -539,7 +537,7 @@ object InspectableQueue { } take - })(token => pubSub.unsubscribe(Left(Some(token)))) + }(token => pubSub.unsubscribe(Left(Some(token)))) def size: Stream[F, Int] = Stream diff --git a/core/shared/src/main/scala/fs2/concurrent/Signal.scala b/core/shared/src/main/scala/fs2/concurrent/Signal.scala index 329537f85f..5cbb85818f 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Signal.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Signal.scala @@ -178,9 +178,7 @@ object SignallingRef { } def tryUpdate(f: A => A): F[Boolean] = - tryModify { a => - (f(a), ()) - }.map(_.nonEmpty) + tryModify(a => (f(a), ())).map(_.nonEmpty) def tryModify[B](f: A => (A, B)): F[Option[B]] = ref.tryModify(modify_(f)).flatMap { @@ -189,9 +187,7 @@ object SignallingRef { } def update(f: A => A): F[Unit] = - modify { a => - (f(a), ()) - } + modify(a => (f(a), ())) def modify[B](f: A => (A, B)): F[B] = ref.modify(modify_(f)).flatMap { diff --git a/core/shared/src/main/scala/fs2/internal/Algebra.scala b/core/shared/src/main/scala/fs2/internal/Algebra.scala index 4e80e04235..350afd7161 100644 --- a/core/shared/src/main/scala/fs2/internal/Algebra.scala +++ b/core/shared/src/main/scala/fs2/internal/Algebra.scala @@ -121,7 +121,7 @@ private[fs2] object Algebra { translate0[F, G, O](u, s, G.concurrentInstance) def uncons[F[_], X, O](s: FreeC[F, O, Unit]): FreeC[F, X, Option[(Chunk[O], FreeC[F, O, Unit])]] = - Step(s, None).map { _.map { case (h, _, t) => (h, t.asInstanceOf[FreeC[F, O, Unit]]) } } + Step(s, None).map(_.map { case (h, _, t) => (h, t.asInstanceOf[FreeC[F, O, Unit]]) }) /** Left-folds the output of a stream. */ def compile[F[_], O, B]( diff --git a/core/shared/src/main/scala/fs2/internal/CompileScope.scala b/core/shared/src/main/scala/fs2/internal/CompileScope.scala index 2a50e259d9..2478f18e3e 100644 --- a/core/shared/src/main/scala/fs2/internal/CompileScope.scala +++ b/core/shared/src/main/scala/fs2/internal/CompileScope.scala @@ -76,9 +76,7 @@ private[fs2] final class CompileScope[F[_]] private ( * This is always invoked before state can be marked as closed. */ private def register(resource: Resource[F]): F[Unit] = - state.update { s => - s.copy(resources = resource +: s.resources) - } + state.update(s => s.copy(resources = resource +: s.resources)) /** * Opens a child scope. @@ -177,11 +175,11 @@ private[fs2] final class CompileScope[F[_]] private ( * reachable from its parent. */ private def releaseChildScope(id: Token): F[Unit] = - state.update { _.unregisterChild(id) } + state.update(_.unregisterChild(id)) /** Returns all direct resources of this scope (does not return resources in ancestor scopes or child scopes). **/ private def resources: F[Chain[Resource[F]]] = - F.map(state.get) { _.resources } + F.map(state.get)(_.resources) /** * Traverses supplied `Chain` with `f` that may produce a failure, and collects these failures. @@ -281,9 +279,7 @@ private[fs2] final class CompileScope[F[_]] private ( } if (self.id == scopeId) F.pure(Some(self)) else - F.flatMap(state.get) { s => - go(s.children) - } + F.flatMap(state.get)(s => go(s.children)) } /** @@ -323,9 +319,7 @@ private[fs2] final class CompileScope[F[_]] private ( else { val allScopes = (s.children :+ self) ++ ancestors F.flatMap(Traverse[Chain].flatTraverse(allScopes)(_.resources)) { allResources => - F.map(TraverseFilter[Chain].traverseFilter(allResources) { r => - r.lease - }) { allLeases => + F.map(TraverseFilter[Chain].traverseFilter(allResources)(r => r.lease)) { allLeases => val lease = new Scope.Lease[F] { def cancel: F[Either[Throwable, Unit]] = traverseError[Scope.Lease[F]](allLeases, _.cancel) @@ -347,7 +341,7 @@ private[fs2] final class CompileScope[F[_]] private ( // note that we guard interruption here by Attempt to prevent failure on multiple sets. val interruptCause = cause.map(_ => iCtx.interruptRoot) F.guarantee(iCtx.deferred.complete(interruptCause)) { - iCtx.ref.update { _.orElse(Some(interruptCause)) } + iCtx.ref.update(_.orElse(Some(interruptCause))) } } @@ -377,7 +371,7 @@ private[fs2] final class CompileScope[F[_]] private ( */ private[internal] def interruptibleEval[A](f: F[A]): F[Either[Either[Throwable, Token], A]] = interruptible match { - case None => F.map(F.attempt(f)) { _.swap.map(Left(_)).swap } + case None => F.map(F.attempt(f))(_.swap.map(Left(_)).swap) case Some(iCtx) => F.map( iCtx.concurrent @@ -492,9 +486,7 @@ private[fs2] object CompileScope { } ) ) - ) { _ => - context - } + )(_ => context) } } .getOrElse(F.pure(copy(cancelParent = F.unit))) diff --git a/core/shared/src/main/scala/fs2/internal/Resource.scala b/core/shared/src/main/scala/fs2/internal/Resource.scala index 90ad5d4714..7e53de095b 100644 --- a/core/shared/src/main/scala/fs2/internal/Resource.scala +++ b/core/shared/src/main/scala/fs2/internal/Resource.scala @@ -112,10 +112,13 @@ private[internal] object Resource { def release(ec: ExitCase[Throwable]): F[Either[Throwable, Unit]] = F.flatMap(state.modify { s => - if (s.leases != 0) - (s.copy(open = false), None) // do not allow to run finalizer if there are leases open - else - (s.copy(open = false, finalizer = None), s.finalizer) // reset finalizer to None, will be run, it available, otherwise the acquire will take care of it + if (s.leases != 0) { + // do not allow to run finalizer if there are leases open + (s.copy(open = false), None) + } else { + // reset finalizer to None, will be run, it available, otherwise the acquire will take care of it + (s.copy(open = false, finalizer = None), s.finalizer) + } })(finalizer => finalizer.map(_(ec)).getOrElse(pru)) def acquired(finalizer: ExitCase[Throwable] => F[Unit]): F[Either[Throwable, Boolean]] = diff --git a/core/shared/src/test/scala/fs2/ChunkSpec.scala b/core/shared/src/test/scala/fs2/ChunkSpec.scala index 8ff9af99f2..16ca8bd6ca 100644 --- a/core/shared/src/test/scala/fs2/ChunkSpec.scala +++ b/core/shared/src/test/scala/fs2/ChunkSpec.scala @@ -64,18 +64,14 @@ class ChunkSpec extends Fs2Spec { ): Unit = s"$name" - { implicit val implicitChunkGenerator: Generator[Chunk[A]] = genChunk - "size" in forAll { (c: Chunk[A]) => - assert(c.size == c.toList.size) - } + "size" in forAll((c: Chunk[A]) => assert(c.size == c.toList.size)) "take" in forAll { (c: Chunk[A], n: PosZInt) => assert(c.take(n).toVector == c.toVector.take(n)) } "drop" in forAll { (c: Chunk[A], n: PosZInt) => assert(c.drop(n).toVector == c.toVector.drop(n)) } - "isEmpty" in forAll { (c: Chunk[A]) => - assert(c.isEmpty == c.toList.isEmpty) - } + "isEmpty" in forAll((c: Chunk[A]) => assert(c.isEmpty == c.toList.isEmpty)) "toArray" in forAll { c: Chunk[A] => assert(c.toArray.toVector == c.toVector) // Do it twice to make sure the first time didn't mutate state diff --git a/core/shared/src/test/scala/fs2/HotswapSpec.scala b/core/shared/src/test/scala/fs2/HotswapSpec.scala index 0331d40b3b..cbe2f645a5 100644 --- a/core/shared/src/test/scala/fs2/HotswapSpec.scala +++ b/core/shared/src/test/scala/fs2/HotswapSpec.scala @@ -9,9 +9,7 @@ class HotswapSpec extends Fs2Spec { Logger[IO].flatMap { logger => Stream .resource(Hotswap(logger.logLifecycleR("a"))) - .flatMap { _ => - logger.logInfo("using") - } + .flatMap(_ => logger.logInfo("using")) .compile .drain *> logger.get.asserting(it => assert( diff --git a/core/shared/src/test/scala/fs2/StreamSpec.scala b/core/shared/src/test/scala/fs2/StreamSpec.scala index 28e16d10bd..9a6e29ff67 100644 --- a/core/shared/src/test/scala/fs2/StreamSpec.scala +++ b/core/shared/src/test/scala/fs2/StreamSpec.scala @@ -33,13 +33,9 @@ class StreamSpec extends Fs2Spec with Matchers { .toVector .asserting { r => r.sliding(2) - .map { s => - (s.head, s.tail.head) - } + .map(s => (s.head, s.tail.head)) .map { case (prev, next) => next - prev } - .foreach { delta => - assert(delta +- 150 === 500L) - } + .foreach(delta => assert(delta +- 150 === 500L)) Succeeded } } @@ -47,9 +43,7 @@ class StreamSpec extends Fs2Spec with Matchers { "liveness" in { val s = Stream .awakeEvery[IO](1.milli) - .evalMap { _ => - IO.async[Unit](cb => executionContext.execute(() => cb(Right(())))) - } + .evalMap(_ => IO.async[Unit](cb => executionContext.execute(() => cb(Right(()))))) .take(200) Stream(s, s, s, s, s).parJoin(5).compile.drain.assertNoException } @@ -70,18 +64,12 @@ class StreamSpec extends Fs2Spec with Matchers { for { events <- Ref.of[F, Vector[BracketEvent]](Vector.empty) _ <- recordBracketEvents(events) - .evalMap(_ => - events.get.asserting { events => - assert(events == Vector(Acquired)) - } - ) + .evalMap(_ => events.get.asserting(events => assert(events == Vector(Acquired)))) .flatMap(_ => use) .compile .drain .handleErrorWith { case _: Err => Sync[F].pure(()) } - _ <- events.get.asserting { it => - assert(it == Vector(Acquired, Released)) - } + _ <- events.get.asserting(it => assert(it == Vector(Acquired, Released))) } yield () "normal termination" in { singleBracketTest[SyncIO, Unit](Stream.empty) } @@ -215,7 +203,7 @@ class StreamSpec extends Fs2Spec with Matchers { var o: Vector[Int] = Vector.empty (0 until 10) .foldLeft(Stream.eval(IO(0))) { (acc, i) => - Stream.bracket(IO(i))(i => IO { o = o :+ i }).flatMap(_ => acc) + Stream.bracket(IO(i))(i => IO { o = o :+ i; () }).flatMap(_ => acc) } .compile .drain @@ -228,7 +216,9 @@ class StreamSpec extends Fs2Spec with Matchers { var o: Vector[Int] = Vector.empty (0 until 10) .foldLeft(Stream.emit(1).map(_ => throw new Err): Stream[IO, Int]) { (acc, i) => - Stream.emit(i) ++ Stream.bracket(IO(i))(i => IO { o = o :+ i }).flatMap(_ => acc) + Stream.emit(i) ++ Stream + .bracket(IO(i))(i => IO { o = o :+ i; () }) + .flatMap(_ => acc) } .attempt .compile @@ -268,7 +258,7 @@ class StreamSpec extends Fs2Spec with Matchers { val s = s0.map { s => Stream .bracketCase(counter.increment) { (_, ec) => - counter.decrement >> IO { ecs = ecs :+ ec } + counter.decrement >> IO { ecs = ecs :+ ec; () } } .flatMap(_ => s) } @@ -288,7 +278,7 @@ class StreamSpec extends Fs2Spec with Matchers { val s = s0.map { s => Stream .bracketCase(counter.increment) { (_, ec) => - counter.decrement >> IO { ecs = ecs :+ ec } + counter.decrement >> IO { ecs = ecs :+ ec; () } } .flatMap(_ => s ++ Stream.raiseError[IO](new Err)) } @@ -308,7 +298,7 @@ class StreamSpec extends Fs2Spec with Matchers { val s = Stream .bracketCase(counter.increment) { (_, ec) => - counter.decrement >> IO { ecs = ecs :+ ec } + counter.decrement >> IO { ecs = ecs :+ ec; () } } .flatMap(_ => s0 ++ Stream.never[IO]) s.compile.drain.start @@ -330,7 +320,7 @@ class StreamSpec extends Fs2Spec with Matchers { val s = Stream .bracketCase(counter.increment) { (_, ec) => - counter.decrement >> IO { ecs = ecs :+ ec } + counter.decrement >> IO { ecs = ecs :+ ec; () } } .flatMap(_ => s0 ++ Stream.never[IO]) s.interruptAfter(50.millis).compile.drain.flatMap(_ => counter.get).asserting { count => @@ -353,9 +343,7 @@ class StreamSpec extends Fs2Spec with Matchers { IO.suspend { var counter = 0 val s2 = s.append(Stream.emits(List.fill(n + 1)(0))).repeat - s2.evalMap { i => - IO { counter += 1; i } - } + s2.evalMap(i => IO { counter += 1; i }) .buffer(n) .take(n + 1) .compile @@ -366,18 +354,14 @@ class StreamSpec extends Fs2Spec with Matchers { } "bufferAll" - { - "identity" in forAll { (s: Stream[Pure, Int]) => - assert(s.bufferAll.toVector == s.toVector) - } + "identity" in forAll((s: Stream[Pure, Int]) => assert(s.bufferAll.toVector == s.toVector)) "buffer results of evalMap" in forAll { (s: Stream[Pure, Int]) => val expected = s.toList.size * 2 IO.suspend { var counter = 0 s.append(s) - .evalMap { i => - IO { counter += 1; i } - } + .evalMap(i => IO { counter += 1; i }) .bufferAll .take(s.toList.size + 1) .compile @@ -397,9 +381,7 @@ class StreamSpec extends Fs2Spec with Matchers { IO.suspend { var counter = 0 val s2 = s.map(x => if (x == Int.MinValue) x + 1 else x).map(_.abs) - val s3 = s2.append(Stream.emit(-1)).append(s2).evalMap { i => - IO { counter += 1; i } - } + val s3 = s2.append(Stream.emit(-1)).append(s2).evalMap(i => IO { counter += 1; i }) s3.bufferBy(_ >= 0) .take(s.toList.size + 2) .compile @@ -444,9 +426,7 @@ class StreamSpec extends Fs2Spec with Matchers { Stream( Stream .unfold(0)(i => (i + 1, i + 1).some) - .flatMap { i => - Stream.sleep_(50.milliseconds) ++ Stream.emit(i) - } + .flatMap(i => Stream.sleep_(50.milliseconds) ++ Stream.emit(i)) .through(q.enqueue), q.dequeue.drain ).parJoin(2) @@ -465,9 +445,7 @@ class StreamSpec extends Fs2Spec with Matchers { } "chunk" in { - forAll { (c: Chunk[Int]) => - assert(Stream.chunk(c).compile.to(Chunk) == c) - } + forAll((c: Chunk[Int]) => assert(Stream.chunk(c).compile.to(Chunk) == c)) } "chunkLimit" in forAll { (s: Stream[Pure, Int], n0: PosInt) => @@ -704,14 +682,10 @@ class StreamSpec extends Fs2Spec with Matchers { .compile .resource .drain - .use { _ => - IO.unit - } + .use(_ => IO.unit) .guaranteeCase(stop.complete) - r.start.flatMap { fiber => - IO.sleep(200.millis) >> fiber.cancel >> stop.get - } + r.start.flatMap(fiber => IO.sleep(200.millis) >> fiber.cancel >> stop.get) } p.timeout(2.seconds) .asserting(it => assert(it == ExitCase.Canceled)) @@ -802,9 +776,7 @@ class StreamSpec extends Fs2Spec with Matchers { Stream .bracket(IO.unit)(_ => finRef.update(_ :+ "Outer")) - .flatMap { _ => - s.covary[IO].concurrently(runner) - } + .flatMap(_ => s.covary[IO].concurrently(runner)) .interruptWhen(halt.get.attempt) .compile .drain @@ -817,7 +789,8 @@ class StreamSpec extends Fs2Spec with Matchers { // exception shall be thrown assert(finalizers == List("Inner", "Outer")) assert(r.swap.toOption.get.isInstanceOf[Err]) - } else + } + else IO { // still the outer finalizer shall be run, but there is no failure in `s` assert(finalizers == List("Outer")) @@ -1150,12 +1123,12 @@ class StreamSpec extends Fs2Spec with Matchers { (IO.shift >> durationsSinceSpike.compile.toVector).unsafeToFuture().map { result => val list = result.toList - withClue("every always emits true first") { assert(list.head._1) } + withClue("every always emits true first")(assert(list.head._1)) withClue(s"true means the delay has passed: ${list.tail}") { - assert(list.tail.filter(_._1).map(_._2).forall { _ >= delay }) + assert(list.tail.filter(_._1).map(_._2).forall(_ >= delay)) } withClue(s"false means the delay has not passed: ${list.tail}") { - assert(list.tail.filterNot(_._1).map(_._2).forall { _ <= delay }) + assert(list.tail.filterNot(_._1).map(_._2).forall(_ <= delay)) } } } @@ -1416,10 +1389,8 @@ class StreamSpec extends Fs2Spec with Matchers { Pull .pure(1) .covary[SyncIO] - .handleErrorWith(_ => { i += 1; Pull.pure(2) }) - .flatMap { _ => - Pull.output1(i) >> Pull.raiseError[SyncIO](new Err) - } + .handleErrorWith { _ => i += 1; Pull.pure(2) } + .flatMap(_ => Pull.output1(i) >> Pull.raiseError[SyncIO](new Err)) .stream .compile .drain @@ -1433,10 +1404,8 @@ class StreamSpec extends Fs2Spec with Matchers { var i = 0 Pull .eval(SyncIO(1)) - .handleErrorWith(_ => { i += 1; Pull.pure(2) }) - .flatMap { _ => - Pull.output1(i) >> Pull.raiseError[SyncIO](new Err) - } + .handleErrorWith { _ => i += 1; Pull.pure(2) } + .flatMap(_ => Pull.output1(i) >> Pull.raiseError[SyncIO](new Err)) .stream .compile .drain @@ -1453,10 +1422,8 @@ class StreamSpec extends Fs2Spec with Matchers { .flatMap { x => Pull .pure(x) - .handleErrorWith(_ => { i += 1; Pull.pure(2) }) - .flatMap { _ => - Pull.output1(i) >> Pull.raiseError[SyncIO](new Err) - } + .handleErrorWith { _ => i += 1; Pull.pure(2) } + .flatMap(_ => Pull.output1(i) >> Pull.raiseError[SyncIO](new Err)) } .stream .compile @@ -1472,9 +1439,7 @@ class StreamSpec extends Fs2Spec with Matchers { Pull .eval(SyncIO(???)) .handleErrorWith(_ => Pull.pure(i += 1)) - .flatMap { _ => - Pull.output1(i) - } + .flatMap(_ => Pull.output1(i)) .stream .compile .drain @@ -1502,9 +1467,7 @@ class StreamSpec extends Fs2Spec with Matchers { .range(0, 10) .covary[SyncIO] .append(Stream.raiseError[SyncIO](new Err)) - .handleErrorWith { _ => - i += 1; Stream.empty - } + .handleErrorWith { _ => i += 1; Stream.empty } .compile .drain .asserting(_ => assert(i == 1)) @@ -1531,9 +1494,7 @@ class StreamSpec extends Fs2Spec with Matchers { (Stream .range(0, 3) .covary[SyncIO] ++ Stream.raiseError[SyncIO](new Err)).unchunk.pull.echo - .handleErrorWith { _ => - i += 1; Pull.done - } + .handleErrorWith { _ => i += 1; Pull.done } .stream .compile .drain @@ -1562,9 +1523,7 @@ class StreamSpec extends Fs2Spec with Matchers { } } - "head" in forAll { (s: Stream[Pure, Int]) => - assert(s.head.toList == s.toList.take(1)) - } + "head" in forAll((s: Stream[Pure, Int]) => assert(s.head.toList == s.toList.take(1))) "interleave" - { "interleave left/right side infinite" in { @@ -1697,9 +1656,8 @@ class StreamSpec extends Fs2Spec with Matchers { val interrupt = Stream.sleep_[IO](20.millis).compile.drain.attempt - def loop(i: Int): Stream[IO, Int] = Stream.emit(i).covary[IO].flatMap { i => - Stream.emit(i) ++ loop(i + 1) - } + def loop(i: Int): Stream[IO, Int] = + Stream.emit(i).covary[IO].flatMap(i => Stream.emit(i) ++ loop(i + 1)) loop(0) .interruptWhen(interrupt) @@ -1780,9 +1738,8 @@ class StreamSpec extends Fs2Spec with Matchers { s.covary[IO] .evalMap { i => // enable interruption and hang when hitting a value divisible by 7 - if (i % 7 == 0) enableInterrupt.release.flatMap { _ => - barrier.acquire.as(i) - } else IO.pure(i) + if (i % 7 == 0) enableInterrupt.release.flatMap(_ => barrier.acquire.as(i)) + else IO.pure(i) } .interruptWhen(interrupt) } @@ -2040,7 +1997,7 @@ class StreamSpec extends Fs2Spec with Matchers { } "map" - { - "map.toList == toList.map" in forAll { (s: Stream[Pure, Int], f: Int => Int) => + "map.toList == toList.map" in forAll { (s: Stream[Pure, Int], f: Function1[Int, Int]) => assert(s.map(f).toList == s.toList.map(f)) } @@ -2188,15 +2145,18 @@ class StreamSpec extends Fs2Spec with Matchers { ) assert(finalizers.lastOption == Some("Outer")) assert(r == Left(err)) - } else if (left) IO { + } + else if (left) IO { assert(finalizers == List("Inner L", "Outer")) if (leftBiased) assert(r == Left(err)) else assert(r == Right(())) - } else if (right) IO { + } + else if (right) IO { assert(finalizers == List("Inner R", "Outer")) if (!leftBiased) assert(r == Left(err)) else assert(r == Right(())) - } else + } + else IO { assert(finalizers == List("Outer")) assert(r == Right(())) @@ -2358,7 +2318,9 @@ class StreamSpec extends Fs2Spec with Matchers { Stream .eval(IO(1)) .append(Stream.eval(IO.raiseError(new Err))) - .observe(_.evalMap(_ => IO.sleep(100.millis))) //Have to do some work here, so that we give time for the underlying stream to try pull more + .observe( + _.evalMap(_ => IO.sleep(100.millis)) + ) //Have to do some work here, so that we give time for the underlying stream to try pull more .take(1) .compile .toList @@ -2511,9 +2473,9 @@ class StreamSpec extends Fs2Spec with Matchers { finalizerRef.get.flatMap { finalizers => runEvidenceRef.get.flatMap { streamRunned => IO { - val expectedFinalizers = (streamRunned.map { idx => + val expectedFinalizers = streamRunned.map { idx => s"Inner $idx" - }) :+ "Outer" + } :+ "Outer" assert(containSameElements(finalizers, expectedFinalizers)) assert(finalizers.lastOption == Some("Outer")) if (streamRunned.contains(biasIdx)) assert(r == Left(err)) @@ -2621,9 +2583,7 @@ class StreamSpec extends Fs2Spec with Matchers { Stream(1, 2, 3) .evalMap(i => IO.sleep(1.second).as(i)) .prefetch - .flatMap { i => - Stream.eval(IO.sleep(1.second).as(i)) - } + .flatMap(i => Stream.eval(IO.sleep(1.second).as(i))) .compile .toList .asserting { _ => @@ -3139,7 +3099,7 @@ class StreamSpec extends Fs2Spec with Matchers { val job = { val start = System.currentTimeMillis() IO { - delays.synchronized { delays += System.currentTimeMillis() - start } + delays.synchronized(delays += System.currentTimeMillis() - start) throw RetryErr() } } @@ -3373,7 +3333,8 @@ class StreamSpec extends Fs2Spec with Matchers { Stream.eval(Ref[IO].of(false)).flatMap { innerReleased => s.delayBy[IO](25.millis) .onFinalize(innerReleased.get.flatMap(inner => verdict.complete(inner))) - .switchMap(_ => Stream.raiseError[IO](new Err).onFinalize(innerReleased.set(true)) + .switchMap(_ => + Stream.raiseError[IO](new Err).onFinalize(innerReleased.set(true)) ) .attempt .drain ++ @@ -3407,9 +3368,7 @@ class StreamSpec extends Fs2Spec with Matchers { } } - "tail" in forAll { (s: Stream[Pure, Int]) => - assert(s.tail.toList == s.toList.drop(1)) - } + "tail" in forAll((s: Stream[Pure, Int]) => assert(s.tail.toList == s.toList.drop(1))) "take" - { "identity" in forAll { (s: Stream[Pure, Int], negate: Boolean, n0: PosInt) => @@ -3522,12 +3481,8 @@ class StreamSpec extends Fs2Spec with Matchers { case Some(step) => Pull.output(step.head) >> step.stepLeg.flatMap(goStep) } (Stream.eval(() => 1) ++ Stream.eval(() => 2)) - .flatMap { a => - Stream.emit(a) - } - .flatMap { a => - Stream.eval(() => a + 1) ++ Stream.eval(() => a + 2) - } + .flatMap(a => Stream.emit(a)) + .flatMap(a => Stream.eval(() => a + 1) ++ Stream.eval(() => a + 2)) .pull .stepLeg .flatMap(goStep) @@ -3756,7 +3711,7 @@ class StreamSpec extends Fs2Spec with Matchers { } "2" in { val s = Stream(0).scope - assertThrows[Throwable] { brokenZip(s ++ s, s.zip(s)).compile.toList } + assertThrows[Throwable](brokenZip(s ++ s, s.zip(s)).compile.toList) } "3" in { Logger[IO] diff --git a/core/shared/src/test/scala/fs2/TextSpec.scala b/core/shared/src/test/scala/fs2/TextSpec.scala index a5d2aa4bac..9844fac361 100644 --- a/core/shared/src/test/scala/fs2/TextSpec.scala +++ b/core/shared/src/test/scala/fs2/TextSpec.scala @@ -51,9 +51,7 @@ class TextSpec extends Fs2Spec { ) } - "all chars" in forAll { (c: Char) => - checkChar(c) - } + "all chars" in forAll((c: Char) => checkChar(c)) "1 byte char" in checkBytes(0x24) // $ "2 byte char" in checkBytes(0xC2, 0xA2) // ยข @@ -65,7 +63,7 @@ class TextSpec extends Fs2Spec { "incomplete 4 byte char" in checkBytes(0xF0, 0xA4, 0xAD) "preserve complete inputs" in forAll { (l0: List[String]) => - val l = l0.filter { _.nonEmpty } + val l = l0.filter(_.nonEmpty) assert(Stream(l: _*).map(utf8Bytes).flatMap(Stream.chunk).through(utf8Decode).toList == l) assert(Stream(l0: _*).map(utf8Bytes).through(utf8DecodeC).toList == l0) } diff --git a/core/shared/src/test/scala/fs2/concurrent/BroadcastSpec.scala b/core/shared/src/test/scala/fs2/concurrent/BroadcastSpec.scala index 404e9ee802..5faecfb0bb 100644 --- a/core/shared/src/test/scala/fs2/concurrent/BroadcastSpec.scala +++ b/core/shared/src/test/scala/fs2/concurrent/BroadcastSpec.scala @@ -13,9 +13,7 @@ class BroadcastSpec extends Fs2Spec { val expect = source.compile.toVector.map(_.toString) def pipe(idx: Int): Pipe[IO, Int, (Int, String)] = - _.map { i => - (idx, i.toString) - } + _.map(i => (idx, i.toString)) source .broadcastThrough((0 until concurrent).map(idx => pipe(idx)): _*) diff --git a/core/shared/src/test/scala/fs2/concurrent/SignalSpec.scala b/core/shared/src/test/scala/fs2/concurrent/SignalSpec.scala index 9ee945adc4..971557f727 100644 --- a/core/shared/src/test/scala/fs2/concurrent/SignalSpec.scala +++ b/core/shared/src/test/scala/fs2/concurrent/SignalSpec.scala @@ -17,9 +17,7 @@ class SignalSpec extends Fs2Spec { "SignallingRef" - { "get/set/discrete" in { forAll { (vs0: List[Long]) => - val vs = vs0.map { n => - if (n == 0) 1 else n - } + val vs = vs0.map(n => if (n == 0) 1 else n) SignallingRef[IO, Long](0L).flatMap { s => Ref.of[IO, Long](0).flatMap { r => val publisher = s.discrete.evalMap(r.set) diff --git a/core/shared/src/test/scala/fs2/concurrent/TopicSpec.scala b/core/shared/src/test/scala/fs2/concurrent/TopicSpec.scala index 733b28010a..f546993367 100644 --- a/core/shared/src/test/scala/fs2/concurrent/TopicSpec.scala +++ b/core/shared/src/test/scala/fs2/concurrent/TopicSpec.scala @@ -54,10 +54,8 @@ class TopicSpec extends Fs2Spec { val subscriber = topic .subscribe(1) .take(count + 1) - .flatMap { is => - Stream.eval(signal.get).map(is -> _) - } - .fold(Vector.empty[(Int, Int)]) { _ :+ _ } + .flatMap(is => Stream.eval(signal.get).map(is -> _)) + .fold(Vector.empty[(Int, Int)])(_ :+ _) Stream .range(0, subs) diff --git a/io/src/main/scala/fs2/io/tcp/SocketGroup.scala b/io/src/main/scala/fs2/io/tcp/SocketGroup.scala index 23017a4138..a8f8f9a502 100644 --- a/io/src/main/scala/fs2/io/tcp/SocketGroup.scala +++ b/io/src/main/scala/fs2/io/tcp/SocketGroup.scala @@ -342,9 +342,7 @@ final class SocketGroup(channelGroup: AsynchronousChannelGroup, blocker: Blocker def write(bytes: Chunk[Byte], timeout: Option[FiniteDuration]): F[Unit] = write0(bytes, timeout) def writes(timeout: Option[FiniteDuration]): Pipe[F, Byte, Unit] = - _.chunks.flatMap { bs => - Stream.eval(write(bs, timeout)) - } + _.chunks.flatMap(bs => Stream.eval(write(bs, timeout))) def localAddress: F[SocketAddress] = blocker.delay(ch.getLocalAddress) diff --git a/io/src/main/scala/fs2/io/tls/TLSContext.scala b/io/src/main/scala/fs2/io/tls/TLSContext.scala index f53dd34de4..c607225a15 100644 --- a/io/src/main/scala/fs2/io/tls/TLSContext.scala +++ b/io/src/main/scala/fs2/io/tls/TLSContext.scala @@ -109,9 +109,7 @@ object TLSContext { logger ) ) - .flatMap { engine => - TLSSocket(socket, engine) - } + .flatMap(engine => TLSSocket(socket, engine)) def dtlsClient[F[_]: Concurrent: ContextShift]( socket: udp.Socket[F], @@ -157,9 +155,7 @@ object TLSContext { logger ) ) - .flatMap { engine => - DTLSSocket(socket, remoteAddress, engine) - } + .flatMap(engine => DTLSSocket(socket, remoteAddress, engine)) private def engine[F[_]: Concurrent: ContextShift]( blocker: Blocker, diff --git a/io/src/main/scala/fs2/io/tls/TLSEngine.scala b/io/src/main/scala/fs2/io/tls/TLSEngine.scala index d9fd3b5d9f..ee74e11be3 100644 --- a/io/src/main/scala/fs2/io/tls/TLSEngine.scala +++ b/io/src/main/scala/fs2/io/tls/TLSEngine.scala @@ -66,9 +66,7 @@ private[tls] object TLSEngine { private def doWrap(binding: Binding[F]): F[Unit] = wrapBuffer .perform(engine.wrap(_, _)) - .flatTap { result => - log(s"doWrap result: $result") - } + .flatTap(result => log(s"doWrap result: $result")) .flatMap { result => result.getStatus match { case SSLEngineResult.Status.OK => @@ -104,9 +102,7 @@ private[tls] object TLSEngine { private def doUnwrap(binding: Binding[F]): F[Option[Chunk[Byte]]] = unwrapBuffer .perform(engine.unwrap(_, _)) - .flatTap { result => - log(s"unwrap result: $result") - } + .flatTap(result => log(s"unwrap result: $result")) .flatMap { result => result.getStatus match { case SSLEngineResult.Status.OK => @@ -183,9 +179,7 @@ private[tls] object TLSEngine { private def doHsWrap(binding: Binding[F]): F[Unit] = wrapBuffer .perform(engine.wrap(_, _)) - .flatTap { result => - log(s"doHsWrap result: $result") - } + .flatTap(result => log(s"doHsWrap result: $result")) .flatMap { result => result.getStatus match { case SSLEngineResult.Status.OK | SSLEngineResult.Status.BUFFER_UNDERFLOW => @@ -208,9 +202,7 @@ private[tls] object TLSEngine { private def doHsUnwrap(binding: Binding[F]): F[Unit] = unwrapBuffer .perform(engine.unwrap(_, _)) - .flatTap { result => - log(s"doHsUnwrap result: $result") - } + .flatTap(result => log(s"doHsUnwrap result: $result")) .flatMap { result => result.getStatus match { case SSLEngineResult.Status.OK => diff --git a/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala b/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala index 7dc8bb17e1..3d55bc851f 100644 --- a/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala +++ b/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala @@ -75,7 +75,8 @@ private[udp] object AsynchronousSocketGroup { } private class Attachment( - readers: ArrayDeque[(Either[Throwable, Packet] => Unit, Option[Timeout])] = new ArrayDeque(), + readers: ArrayDeque[(Either[Throwable, Packet] => Unit, Option[Timeout])] = + new ArrayDeque(), writers: ArrayDeque[((WriterPacket, Option[Throwable] => Unit), Option[Timeout])] = new ArrayDeque() ) { @@ -169,7 +170,7 @@ private[udp] object AsynchronousSocketGroup { val attachment = new Attachment() key = channel.register(selector, 0, attachment) latch.countDown - } { latch.countDown } + }(latch.countDown) latch.await if (key eq null) throw new ClosedChannelException() key @@ -192,15 +193,11 @@ private[udp] object AsynchronousSocketGroup { } if (attachment.hasReaders) { cancelReader = attachment.queueReader(cb, t) - t.foreach { t => - pendingTimeouts += t - } + t.foreach(t => pendingTimeouts += t) } else { if (!read1(channel, cb)) { cancelReader = attachment.queueReader(cb, t) - t.foreach { t => - pendingTimeouts += t - } + t.foreach(t => pendingTimeouts += t) try { key.interestOps(key.interestOps | SelectionKey.OP_READ); () } catch { @@ -208,7 +205,7 @@ private[udp] object AsynchronousSocketGroup { } } } - } { cb(Left(new ClosedChannelException)) } + }(cb(Left(new ClosedChannelException))) private def read1( channel: DatagramChannel, @@ -262,15 +259,11 @@ private[udp] object AsynchronousSocketGroup { } if (attachment.hasWriters) { cancelWriter = attachment.queueWriter((writerPacket, cb), t) - t.foreach { t => - pendingTimeouts += t - } + t.foreach(t => pendingTimeouts += t) } else { if (!write1(channel, writerPacket, cb)) { cancelWriter = attachment.queueWriter((writerPacket, cb), t) - t.foreach { t => - pendingTimeouts += t - } + t.foreach(t => pendingTimeouts += t) try { key.interestOps(key.interestOps | SelectionKey.OP_WRITE); () } catch { @@ -278,7 +271,7 @@ private[udp] object AsynchronousSocketGroup { } } } - } { cb(Some(new ClosedChannelException)) } + }(cb(Some(new ClosedChannelException))) } private def write1( @@ -307,10 +300,10 @@ private[udp] object AsynchronousSocketGroup { key.cancel channel.close attachment.close - } { () } + }(()) override def close(): Unit = - closeLock.synchronized { closed = true } + closeLock.synchronized { closed = true; () } private def onSelectorThread(f: => Unit)(ifClosed: => Unit): Unit = closeLock.synchronized { diff --git a/io/src/main/scala/fs2/io/udp/SocketGroup.scala b/io/src/main/scala/fs2/io/udp/SocketGroup.scala index 3bf278b77a..9ed09f2314 100644 --- a/io/src/main/scala/fs2/io/udp/SocketGroup.scala +++ b/io/src/main/scala/fs2/io/udp/SocketGroup.scala @@ -47,9 +47,7 @@ final class SocketGroup( )(implicit F: Concurrent[F], CS: ContextShift[F]): Resource[F, Socket[F]] = { val mkChannel = blocker.delay { val channel = protocolFamily - .map { pf => - DatagramChannel.open(pf) - } + .map(pf => DatagramChannel.open(pf)) .getOrElse(DatagramChannel.open()) channel.setOption[java.lang.Boolean](StandardSocketOptions.SO_REUSEADDR, reuseAddress) sendBufferSize.foreach { sz => @@ -98,13 +96,13 @@ final class SocketGroup( def writes(timeout: Option[FiniteDuration]): Pipe[F, Packet, Unit] = _.flatMap(p => Stream.eval(write(p, timeout))) - def close: F[Unit] = blocker.delay { asg.close(ctx) } + def close: F[Unit] = blocker.delay(asg.close(ctx)) def join(group: InetAddress, interface: NetworkInterface): F[AnySourceGroupMembership] = blocker.delay { val membership = channel.join(group, interface) new AnySourceGroupMembership { - def drop = blocker.delay { membership.drop } + def drop = blocker.delay(membership.drop) def block(source: InetAddress) = F.delay { membership.block(source); () } @@ -122,7 +120,7 @@ final class SocketGroup( ): F[GroupMembership] = F.delay { val membership = channel.join(group, interface, source) new GroupMembership { - def drop = blocker.delay { membership.drop } + def drop = blocker.delay(membership.drop) override def toString = "GroupMembership" } } diff --git a/io/src/test/scala/fs2/io/JavaInputOutputStreamSpec.scala b/io/src/test/scala/fs2/io/JavaInputOutputStreamSpec.scala index 15cb5839b3..925678b444 100644 --- a/io/src/test/scala/fs2/io/JavaInputOutputStreamSpec.scala +++ b/io/src/test/scala/fs2/io/JavaInputOutputStreamSpec.scala @@ -39,18 +39,18 @@ class JavaInputOutputStreamSpec extends Fs2Spec with EventuallySupport { pending // https://github.com/functional-streams-for-scala/fs2/issues/1063 var closed: Boolean = false val s: Stream[IO, Byte] = - Stream(1.toByte).onFinalize(IO { closed = true }) + Stream(1.toByte).onFinalize(IO { closed = true; () }) toInputStreamResource(s).use(_ => IO.unit).unsafeRunSync() - eventually { assert(closed) } + eventually(assert(closed)) } "upstream.is.force-closed" in { pending // https://github.com/functional-streams-for-scala/fs2/issues/1063 var closed: Boolean = false val s: Stream[IO, Byte] = - Stream(1.toByte).onFinalize(IO { closed = true }) + Stream(1.toByte).onFinalize(IO { closed = true; () }) val result = toInputStreamResource(s) @@ -71,9 +71,7 @@ class JavaInputOutputStreamSpec extends Fs2Spec with EventuallySupport { .map(_.toByte) .covary[IO] .through(toInputStream) - .map { is => - Vector.fill(257)(is.read()) - } + .map(is => Vector.fill(257)(is.read())) .compile .toVector .map(_.flatten) diff --git a/io/src/test/scala/fs2/io/file/FileSpec.scala b/io/src/test/scala/fs2/io/file/FileSpec.scala index 415f31fa94..8ad2efafa9 100644 --- a/io/src/test/scala/fs2/io/file/FileSpec.scala +++ b/io/src/test/scala/fs2/io/file/FileSpec.scala @@ -397,9 +397,7 @@ class FileSpec extends BaseFileSpec { .resource(Blocker[IO]) .flatMap { blocker => tempDirectory - .flatMap { path => - file.directoryStream[IO](blocker, path) - } + .flatMap(path => file.directoryStream[IO](blocker, path)) } .compile .toList @@ -469,9 +467,7 @@ class FileSpec extends BaseFileSpec { .resource(Blocker[IO]) .flatMap { blocker => tempFilesHierarchy - .flatMap { topDir => - file.walk[IO](blocker, topDir) - } + .flatMap(topDir => file.walk[IO](blocker, topDir)) } .compile .toList diff --git a/io/src/test/scala/fs2/io/tcp/SocketSpec.scala b/io/src/test/scala/fs2/io/tcp/SocketSpec.scala index c55c7793ff..3fd1a14b98 100644 --- a/io/src/test/scala/fs2/io/tcp/SocketSpec.scala +++ b/io/src/test/scala/fs2/io/tcp/SocketSpec.scala @@ -73,7 +73,7 @@ class SocketSpec extends Fs2Spec { .unsafeRunTimed(timeout) .get assert(result.size == clientCount) - assert(result.map { new String(_) }.toSet == Set("fs2.rocks")) + assert(result.map(new String(_)).toSet == Set("fs2.rocks")) } // Ensure that readN yields chunks of the requested size diff --git a/io/src/test/scala/fs2/io/tls/DTLSSocketSpec.scala b/io/src/test/scala/fs2/io/tls/DTLSSocketSpec.scala index 28a52cd325..d120af752e 100644 --- a/io/src/test/scala/fs2/io/tls/DTLSSocketSpec.scala +++ b/io/src/test/scala/fs2/io/tls/DTLSSocketSpec.scala @@ -33,9 +33,7 @@ class DTLSSocketSpec extends TLSSpec { val echoServer = dtlsServerSocket .reads(None) - .evalMap { p => - dtlsServerSocket.write(p, None) - } + .evalMap(p => dtlsServerSocket.write(p, None)) .drain val msg = Chunk.bytes("Hello, world!".getBytes) val echoClient = Stream.sleep_(500.milliseconds) ++ Stream.eval_( diff --git a/io/src/test/scala/fs2/io/udp/UdpSpec.scala b/io/src/test/scala/fs2/io/udp/UdpSpec.scala index 4048e07c01..e80d07a40a 100644 --- a/io/src/test/scala/fs2/io/udp/UdpSpec.scala +++ b/io/src/test/scala/fs2/io/udp/UdpSpec.scala @@ -29,13 +29,11 @@ class UdpSpec extends Fs2Spec { Stream .resource(socketGroup.open[IO]()) .flatMap { serverSocket => - Stream.eval(serverSocket.localAddress).map { _.getPort }.flatMap { serverPort => + Stream.eval(serverSocket.localAddress).map(_.getPort).flatMap { serverPort => val serverAddress = new InetSocketAddress("localhost", serverPort) val server = serverSocket .reads() - .evalMap { packet => - serverSocket.write(packet) - } + .evalMap(packet => serverSocket.write(packet)) .drain val client = Stream.resource(socketGroup.open[IO]()).flatMap { clientSocket => Stream(Packet(serverAddress, msg)) @@ -52,9 +50,7 @@ class UdpSpec extends Fs2Spec { } "echo lots" in { - val msgs = (1 to 20).toVector.map { n => - Chunk.bytes(("Hello, world! " + n).getBytes) - } + val msgs = (1 to 20).toVector.map(n => Chunk.bytes(("Hello, world! " + n).getBytes)) val numClients = 50 val numParallelClients = 10 mkSocketGroup @@ -62,19 +58,15 @@ class UdpSpec extends Fs2Spec { Stream .resource(socketGroup.open[IO]()) .flatMap { serverSocket => - Stream.eval(serverSocket.localAddress).map { _.getPort }.flatMap { serverPort => + Stream.eval(serverSocket.localAddress).map(_.getPort).flatMap { serverPort => val serverAddress = new InetSocketAddress("localhost", serverPort) val server = serverSocket .reads() - .evalMap { packet => - serverSocket.write(packet) - } + .evalMap(packet => serverSocket.write(packet)) .drain val client = Stream.resource(socketGroup.open[IO]()).flatMap { clientSocket => Stream - .emits(msgs.map { msg => - Packet(serverAddress, msg) - }) + .emits(msgs.map(msg => Packet(serverAddress, msg))) .flatMap { msg => Stream.eval_(clientSocket.write(msg)) ++ Stream.eval(clientSocket.read()) } @@ -113,7 +105,7 @@ class UdpSpec extends Fs2Spec { ) ) .flatMap { serverSocket => - Stream.eval(serverSocket.localAddress).map { _.getPort }.flatMap { serverPort => + Stream.eval(serverSocket.localAddress).map(_.getPort).flatMap { serverPort => val v4Interfaces = NetworkInterface.getNetworkInterfaces.asScala.toList.filter { interface => interface.getInetAddresses.asScala.exists(_.isInstanceOf[Inet4Address]) @@ -122,9 +114,7 @@ class UdpSpec extends Fs2Spec { .eval_(v4Interfaces.traverse(interface => serverSocket.join(group, interface))) ++ serverSocket .reads() - .evalMap { packet => - serverSocket.write(packet) - } + .evalMap(packet => serverSocket.write(packet)) .drain val client = Stream.resource(socketGroup.open[IO]()).flatMap { clientSocket => Stream(Packet(new InetSocketAddress(group, serverPort), msg)) @@ -145,9 +135,7 @@ class UdpSpec extends Fs2Spec { .flatMap { socketGroup => Stream .resource(socketGroup.open[IO]()) - .flatMap { socket => - socket.reads(timeout = Some(50.millis)) - } + .flatMap(socket => socket.reads(timeout = Some(50.millis))) } .compile .drain diff --git a/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscriber.scala b/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscriber.scala index 952b61c153..1f93fc0d34 100644 --- a/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscriber.scala +++ b/reactive-streams/src/main/scala/fs2/interop/reactivestreams/StreamSubscriber.scala @@ -140,9 +140,7 @@ object StreamSubscriber { def onComplete: F[Unit] = nextState(OnComplete) def onFinalize: F[Unit] = nextState(OnFinalize) def dequeue1: F[Either[Throwable, Option[A]]] = - Deferred[F, Out].flatMap { p => - ref.modify(step(OnDequeue(p))).flatten >> p.get - } + Deferred[F, Out].flatMap(p => ref.modify(step(OnDequeue(p))).flatten >> p.get) } } } diff --git a/reactive-streams/src/main/scala/fs2/interop/reactivestreams/package.scala b/reactive-streams/src/main/scala/fs2/interop/reactivestreams/package.scala index 8e56874688..cccf8f3fa3 100644 --- a/reactive-streams/src/main/scala/fs2/interop/reactivestreams/package.scala +++ b/reactive-streams/src/main/scala/fs2/interop/reactivestreams/package.scala @@ -36,9 +36,7 @@ package object reactivestreams { def fromPublisher[F[_]: ConcurrentEffect, A](p: Publisher[A]): Stream[F, A] = Stream .eval(StreamSubscriber[F, A]) - .evalTap { s => - Sync[F].delay(p.subscribe(s)) - } + .evalTap(s => Sync[F].delay(p.subscribe(s))) .flatMap(_.sub.stream) implicit final class PublisherOps[A](val publisher: Publisher[A]) extends AnyVal { diff --git a/reactive-streams/src/test/scala/fs2/interop/reactivestreams/SubscriberSpec.scala b/reactive-streams/src/test/scala/fs2/interop/reactivestreams/SubscriberSpec.scala index f21e763fa9..00d9ad2849 100644 --- a/reactive-streams/src/test/scala/fs2/interop/reactivestreams/SubscriberSpec.scala +++ b/reactive-streams/src/test/scala/fs2/interop/reactivestreams/SubscriberSpec.scala @@ -33,9 +33,7 @@ final class SubscriberWhiteboxSpec p: SubscriberWhiteboxVerification.WhiteboxSubscriberProbe[Int] ): Subscriber[Int] = StreamSubscriber[IO, Int] - .map { s => - new WhiteboxSubscriber(s, p) - } + .map(s => new WhiteboxSubscriber(s, p)) .unsafeRunSync() def createElement(i: Int): Int = counter.getAndIncrement