Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update formatting with latest Scala Steward #1796

Merged
merged 5 commits into from
Feb 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class FreeCBenchmark {
self.viewL match {
case Result.Pure(r) => F.pure(Some(r))
case Result.Fail(e) => F.raiseError(e)
case Result.Interrupted(_, err) => err.fold[F[Option[R]]](F.pure(None)) { F.raiseError }
case Result.Interrupted(_, err) => err.fold[F[Option[R]]](F.pure(None))(F.raiseError)
case _ @ViewL.View(_) => F.raiseError(new RuntimeException("Never get here)"))
}
}
16 changes: 7 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ lazy val scaladocSettings = Seq(
"-implicits-sound-shadowing",
"-implicits-show-all"
),
scalacOptions in (Compile, doc) ~= { _.filterNot { _ == "-Xfatal-warnings" } },
scalacOptions in (Compile, doc) ~= { _.filterNot(_ == "-Xfatal-warnings") },
autoAPIMappings := true
)

Expand All @@ -140,16 +140,16 @@ lazy val publishingSettings = Seq(
password <- Option(System.getenv().get("SONATYPE_PASSWORD"))
} yield Credentials("Sonatype Nexus Repository Manager", "oss.sonatype.org", username, password)).toSeq,
publishMavenStyle := true,
pomIncludeRepository := { _ =>
false
},
pomIncludeRepository := { _ => false },
pomExtra := {
<developers>
{for ((username, name) <- contributors) yield <developer>
{
for ((username, name) <- contributors) yield <developer>
<id>{username}</id>
<name>{name}</name>
<url>http://github.com/{username}</url>
</developer>}
</developer>
}
</developers>
},
pomPostProcess := { node =>
Expand All @@ -159,9 +159,7 @@ lazy val publishingSettings = Seq(
override def transform(n: Node) =
if (f(n)) NodeSeq.Empty else n
}
val stripTestScope = stripIf { n =>
n.label == "dependency" && (n \ "scope").text == "test"
}
val stripTestScope = stripIf(n => n.label == "dependency" && (n \ "scope").text == "test")
new RuleTransformer(stripTestScope).transform(node)(0)
},
gpgWarnOnFailure := Option(System.getenv().get("GPG_WARN_ON_FAILURE")).isDefined
Expand Down
3 changes: 2 additions & 1 deletion core/jvm/src/main/scala/fs2/compress.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ object compress {
def push(chunk: Chunk[Byte]): Unit = {
val arr: Array[Byte] = {
val buf = new Array[Byte](chunk.size)
chunk.copyToArray(buf) // Note: we can be slightly better than this for Chunk.Bytes if we track incoming offsets in abis
// Note: we can be slightly better than this for Chunk.Bytes if we track incoming offsets in abis
chunk.copyToArray(buf)
buf
}
val pushed = abis.push(arr)
Expand Down
4 changes: 1 addition & 3 deletions core/jvm/src/main/scala/fs2/hash.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ object hash {
d.update(bytes.values, bytes.offset, bytes.size)
d
}
.flatMap { d =>
Stream.chunk(Chunk.bytes(d.digest()))
}
.flatMap(d => Stream.chunk(Chunk.bytes(d.digest())))
}
}
27 changes: 8 additions & 19 deletions core/jvm/src/test/scala/fs2/HashSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,20 @@ class HashSpec extends Fs2Spec {
else
str.getBytes
.grouped(n)
.foldLeft(Stream.empty.covaryOutput[Byte])((acc, c) => acc ++ Stream.chunk(Chunk.bytes(c))
.foldLeft(Stream.empty.covaryOutput[Byte])((acc, c) =>
acc ++ Stream.chunk(Chunk.bytes(c))
)

assert(s.through(h).toList == digest(algo, str))
}

"digests" - {
"md2" in forAll { (s: String) =>
checkDigest(md2, "MD2", s)
}
"md5" in forAll { (s: String) =>
checkDigest(md5, "MD5", s)
}
"sha1" in forAll { (s: String) =>
checkDigest(sha1, "SHA-1", s)
}
"sha256" in forAll { (s: String) =>
checkDigest(sha256, "SHA-256", s)
}
"sha384" in forAll { (s: String) =>
checkDigest(sha384, "SHA-384", s)
}
"sha512" in forAll { (s: String) =>
checkDigest(sha512, "SHA-512", s)
}
"md2" in forAll((s: String) => checkDigest(md2, "MD2", s))
"md5" in forAll((s: String) => checkDigest(md5, "MD5", s))
"sha1" in forAll((s: String) => checkDigest(sha1, "SHA-1", s))
"sha256" in forAll((s: String) => checkDigest(sha256, "SHA-256", s))
"sha384" in forAll((s: String) => checkDigest(sha384, "SHA-384", s))
"sha512" in forAll((s: String) => checkDigest(sha512, "SHA-512", s))
}

"empty input" in {
Expand Down
18 changes: 4 additions & 14 deletions core/jvm/src/test/scala/fs2/MemorySanityChecks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,7 @@ object DanglingDequeueSanityTest extends App {
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global)
Stream
.eval(Queue.unbounded[IO, Int])
.flatMap { q =>
Stream.constant(1).flatMap { _ =>
Stream.empty.mergeHaltBoth(q.dequeue)
}
}
.flatMap(q => Stream.constant(1).flatMap(_ => Stream.empty.mergeHaltBoth(q.dequeue)))
.compile
.drain
.unsafeRunSync
Expand All @@ -113,9 +109,7 @@ object AwakeEverySanityTest extends App {
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.Implicits.global)
Stream
.awakeEvery[IO](1.millis)
.flatMap { _ =>
Stream.eval(IO(()))
}
.flatMap(_ => Stream.eval(IO(())))
.compile
.drain
.unsafeRunSync
Expand All @@ -125,9 +119,7 @@ object SignalDiscreteSanityTest extends App {
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global)
Stream
.eval(SignallingRef[IO, Unit](()))
.flatMap { signal =>
signal.discrete.evalMap(a => signal.set(a))
}
.flatMap(signal => signal.discrete.evalMap(a => signal.set(a)))
.compile
.drain
.unsafeRunSync
Expand All @@ -137,9 +129,7 @@ object SignalContinuousSanityTest extends App {
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global)
Stream
.eval(SignallingRef[IO, Unit](()))
.flatMap { signal =>
signal.continuous.evalMap(a => signal.set(a))
}
.flatMap(signal => signal.continuous.evalMap(a => signal.set(a)))
.compile
.drain
.unsafeRunSync
Expand Down
4 changes: 2 additions & 2 deletions core/shared/src/main/scala/fs2/Hotswap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ object Hotswap {
Resource.make(initialize)(finalize).map { state =>
new Hotswap[F, R] {
override def swap(next: Resource[F, R]): F[R] =
(next <* ().pure[Resource[F, ?]]) // workaround for https://github.com/typelevel/cats-effect/issues/579
.allocated
// workaround for https://github.com/typelevel/cats-effect/issues/579
(next <* ().pure[Resource[F, ?]]).allocated
.continual { r => // this whole block is inside continual and cannot be canceled
Sync[F].fromEither(r).flatMap {
case (newValue, newFinalizer) =>
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ object Pull extends PullLowPriority {
* Halts when a step terminates with `None` or `Pull.raiseError`.
*/
def loop[F[_], O, R](using: R => Pull[F, O, Option[R]]): R => Pull[F, O, Option[R]] =
r => using(r).flatMap { _.map(loop(using)).getOrElse(Pull.pure(None)) }
r => using(r).flatMap(_.map(loop(using)).getOrElse(Pull.pure(None)))

/** Outputs a single value. */
def output1[F[x] >: Pure[x], O](o: O): Pull[F, O, Unit] =
Expand Down
46 changes: 16 additions & 30 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1010,16 +1010,12 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
def go(z: O2, s: Stream[F2, O]): Pull[F2, O2, Unit] =
s.pull.uncons1.flatMap {
case Some((hd, tl)) =>
Pull.eval(f(z, hd)).flatMap { o =>
Pull.output1(o) >> go(o, tl)
}
Pull.eval(f(z, hd)).flatMap(o => Pull.output1(o) >> go(o, tl))
case None => Pull.done
}
this.pull.uncons1.flatMap {
case Some((hd, tl)) =>
Pull.eval(f(z, hd)).flatMap { o =>
Pull.output(Chunk.seq(List(z, o))) >> go(o, tl)
}
Pull.eval(f(z, hd)).flatMap(o => Pull.output(Chunk.seq(List(z, o))) >> go(o, tl))
case None => Pull.output1(z)
}.stream
}
Expand Down Expand Up @@ -1466,9 +1462,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
// this is the same if in the resize function,
// short circuited to avoid needlessly converting newAcc.toChunk
if (newAcc.size < n) {
Stream.empty ++ startTimeout.flatMap { newTimeout =>
go(newAcc, newTimeout)
}
Stream.empty ++ startTimeout.flatMap(newTimeout => go(newAcc, newTimeout))
} else {
val (toEmit, rest) = resize(newAcc.toChunk, Stream.empty)
toEmit ++ startTimeout.flatMap { newTimeout =>
Expand All @@ -1481,9 +1475,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
}

startTimeout
.flatMap { t =>
go(Chunk.Queue.empty, t).concurrently(producer)
}
.flatMap(t => go(Chunk.Queue.empty, t).concurrently(producer))
.onFinalize {
currentTimeout.modify {
case (cancelInFlightTimeout, _) =>
Expand Down Expand Up @@ -1539,9 +1531,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
)(implicit F: Concurrent[F2]): Resource[F2, Signal[F2, O2]] =
Stream
.eval(SignallingRef[F2, O2](initial))
.flatMap { sig =>
Stream(sig).concurrently(evalMap(sig.set))
}
.flatMap(sig => Stream(sig).concurrently(evalMap(sig.set)))
.compile
.resource
.lastOrError
Expand Down Expand Up @@ -1619,7 +1609,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])

Stream.bracket(F2.start(runR))(_ =>
interruptR.complete(()) >>
doneR.get.flatMap { F2.fromEither }
doneR.get.flatMap(F2.fromEither)
) >> this.interruptWhen(interruptL.get.attempt)
}
}
Expand All @@ -1643,9 +1633,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
)(implicit F2: Concurrent[F2]): Stream[F2, O] =
Stream
.getScope[F2]
.flatMap { scope =>
Stream.supervise(haltOnSignal.flatMap(scope.interrupt)) >> this
}
.flatMap(scope => Stream.supervise(haltOnSignal.flatMap(scope.interrupt)) >> this)
.interruptScope

/**
Expand Down Expand Up @@ -1914,9 +1902,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
interrupt.complete(()).attempt.void // we need to attempt interruption in case the interrupt was already completed.
else
otherSideDone
.modify { prev =>
(true, prev)
}
.modify(prev => (true, prev))
.flatMap { otherDone =>
if (otherDone)
resultQ
Expand Down Expand Up @@ -2170,7 +2156,9 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
F2.start {
inner.chunks
.evalMap(s => outputQ.enqueue1(Some(s)))
.interruptWhen(done.map(_.nonEmpty)) // must be AFTER enqueue to the sync queue, otherwise the process may hang to enqueue last item while being interrupted
.interruptWhen(
done.map(_.nonEmpty)
) // must be AFTER enqueue to the sync queue, otherwise the process may hang to enqueue last item while being interrupted
.compile
.drain
.attempt
Expand All @@ -2197,9 +2185,7 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
def runOuter: F2[Unit] =
outer
.flatMap { inner =>
Stream.getScope[F2].evalMap { outerScope =>
runInner(inner, outerScope)
}
Stream.getScope[F2].evalMap(outerScope => runInner(inner, outerScope))
}
.interruptWhen(done.map(_.nonEmpty))
.compile
Expand Down Expand Up @@ -4059,7 +4045,7 @@ object Stream extends StreamLowPriority {

/** Like `[[unconsN]]`, but leaves the buffered input unconsumed. */
def fetchN(n: Int): Pull[F, INothing, Option[Stream[F, O]]] =
unconsN(n).map { _.map { case (hd, tl) => tl.cons(hd) } }
unconsN(n).map(_.map { case (hd, tl) => tl.cons(hd) })

/** Awaits the next available element where the predicate returns true. */
def find(f: O => Boolean): Pull[F, INothing, Option[(O, Stream[F, O])]] =
Expand Down Expand Up @@ -4640,9 +4626,9 @@ object Stream extends StreamLowPriority {
*/
def stream: Stream[F, O] =
Pull
.loop[F, O, StepLeg[F, O]] { leg =>
Pull.output(leg.head).flatMap(_ => leg.stepLeg)
}(self.setHead(Chunk.empty))
.loop[F, O, StepLeg[F, O]](leg => Pull.output(leg.head).flatMap(_ => leg.stepLeg))(
self.setHead(Chunk.empty)
)
.void
.stream

Expand Down
3 changes: 2 additions & 1 deletion core/shared/src/main/scala/fs2/concurrent/Balance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ object Balance {
private def strategy[O]: PubSub.Strategy[Chunk[O], Chunk[O], Option[Chunk[O]], Int] =
new PubSub.Strategy[Chunk[O], Chunk[O], Option[Chunk[O]], Int] {
def initial: Option[Chunk[O]] =
Some(Chunk.empty) // causes to block first push, hence all the other chunks must be non-empty.
// causes to block first push, hence all the other chunks must be non-empty.
Some(Chunk.empty)

def accepts(i: Chunk[O], state: Option[Chunk[O]]): Boolean =
state.isEmpty
Expand Down
7 changes: 4 additions & 3 deletions core/shared/src/main/scala/fs2/concurrent/Broadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,12 @@ object Broadcast {
def awaitSub = false
def isEmpty = true
}

case class Processing[O](
subscribers: Set[Token],
processing: Set[Token], // added when we enter to Processing state, and removed whenever sub takes current `O`
remains: Set[Token], // removed when subscriber requests another `O` but already seen `current`
// added when we enter to Processing state, and removed whenever sub takes current `O`
processing: Set[Token],
// removed when subscriber requests another `O` but already seen `current`
remains: Set[Token],
current: O
) extends State[O] {
def awaitSub = false
Expand Down
18 changes: 4 additions & 14 deletions core/shared/src/main/scala/fs2/concurrent/PubSub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,11 @@ private[fs2] object PubSub {
def clearPublisher(token: Token)(exitCase: ExitCase[Throwable]): F[Unit] = exitCase match {
case ExitCase.Completed => Applicative[F].unit
case ExitCase.Error(_) | ExitCase.Canceled =>
state.update { ps =>
ps.copy(publishers = ps.publishers.filterNot(_.token == token))
}
state.update(ps => ps.copy(publishers = ps.publishers.filterNot(_.token == token)))
}

def clearSubscriber(token: Token): F[Unit] =
state.update { ps =>
ps.copy(subscribers = ps.subscribers.filterNot(_.token == token))
}
state.update(ps => ps.copy(subscribers = ps.subscribers.filterNot(_.token == token)))

def clearSubscriberOnCancel(token: Token)(exitCase: ExitCase[Throwable]): F[Unit] =
exitCase match {
Expand Down Expand Up @@ -456,11 +452,7 @@ private[fs2] object PubSub {
Some(strategy.initial)

def accepts(i: Option[I], state: Option[S]): Boolean =
i.forall { el =>
state.exists { s =>
strategy.accepts(el, s)
}
}
i.forall(el => state.exists(s => strategy.accepts(el, s)))

def publish(i: Option[I], state: Option[S]): Option[S] =
i match {
Expand All @@ -487,9 +479,7 @@ private[fs2] object PubSub {
(Some(s1), success)
}
def unsubscribe(selector: Sel, state: Option[S]): Option[S] =
state.map { s =>
strategy.unsubscribe(selector, s)
}
state.map(s => strategy.unsubscribe(selector, s))
}

/**
Expand Down
Loading