Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blockTimer and rename to blockShift #19

Merged
merged 12 commits into from
May 31, 2018
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object ThreadNameExample {
.map(Linebacker.fromExecutorService[IO](_)) // Create Linebacker From Executor
.flatMap { implicit linebacker => // Raise Implicitly
Stream.eval(
Linebacker[IO].block(getThread) // Block On Linebacker Pool Not Global
Linebacker[IO].blockShift(getThread) // Block On Linebacker Pool Not Global
) ++
Stream.eval(getThread) // Running On Global
}
Expand Down
7 changes: 3 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ lazy val commonSettings = Seq(
organization := "io.chrisdavenport",
scalaVersion := "2.12.6",
crossScalaVersions := Seq(scalaVersion.value, "2.11.12"),
addCompilerPlugin("org.spire-math" % "kind-projector" % "0.9.6" cross CrossVersion.binary),
addCompilerPlugin("org.spire-math" % "kind-projector" % "0.9.7" cross CrossVersion.binary),
scalafmtOnCompile := true,
scalafmtTestOnCompile := true,
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % "0.10.1",
"co.fs2" %% "fs2-core" % "0.10.3",
"org.specs2" %% "specs2-core" % "4.0.3" % Test
"org.typelevel" %% "cats-effect" % "1.0.0-RC2-d7181dc",
"org.specs2" %% "specs2-core" % "4.2.0" % Test
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.chrisdavenport.linebacker

import cats._
import cats.implicits._
import cats.effect._
import java.util.concurrent.ExecutorService
import scala.concurrent.ExecutionContext
Expand All @@ -11,12 +10,7 @@ trait DualContext[F[_]] {
def defaultContext: ExecutionContext

def block[A](fa: F[A])(implicit F: Async[F]): F[A] =
for {
_ <- Async.shift(blockingContext)
aE <- fa.attempt
_ <- Async.shift(defaultContext)
a <- Applicative[F].pure(aE).rethrow
} yield a
F.bracket(Async.shift[F](blockingContext))(_ => fa)(_ => Async.shift[F](defaultContext))
}

object DualContext {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,44 @@
package io.chrisdavenport.linebacker

import cats._
import cats.implicits._
import cats.effect.Async
import cats.effect._
import java.util.concurrent.ExecutorService
import scala.concurrent.ExecutionContext

trait Linebacker[F[_]] {

def blockingPool: ExecutionContext
def blockingContext: ExecutionContext

@deprecated("0.2.0", "Use blockingContext instead.")
private[linebacker] def blockingPool: ExecutionContext = blockingContext

@deprecated("0.2.0", "Use blockShift instead.")
final private[linebacker] def block[A](

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this at all when RC2 isn't compatible with cats 0.10?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just because I got trolled, what's the point in only deprecating and just just outright removing when you don't need bincompat for 0.2 yet?

fa: F[A])(implicit F: Async[F], ec: ExecutionContext): F[A] =
blockShift(fa)(F, ec)

/**
* Attempts to Run the Given `F[A]` on the blocking pool.
* Then shifts back to the given implicit execution context
* after the Async `F[A]` is evaluated.
*/
final def block[A](fa: F[A])(implicit F: Async[F], ec: ExecutionContext): F[A] =
for {
_ <- Async.shift(blockingPool)
eA <- fa.attempt
_ <- Async.shift(ec)
a <- Applicative[F].pure(eA).rethrow
} yield a
final def blockShift[A](fa: F[A])(implicit F: Async[F], ec: ExecutionContext): F[A] =
F.bracket(Async.shift[F](blockingContext))(_ => fa)(_ => Async.shift[F](ec))

/**
* Attempts to Run the Given `F[A]` on the blocking pool.
* Then shifts back to the F for the timer.
*/
final def blockTimer[A](fa: F[A])(implicit F: Async[F], timer: Timer[F]): F[A] =
F.bracket(Async.shift[F](blockingContext))(_ => fa)(_ => timer.shift)
}

object Linebacker {
def apply[F[_]](implicit ev: Linebacker[F]): Linebacker[F] = ev

def fromExecutorService[F[_]](es: ExecutorService): Linebacker[F] = new Linebacker[F] {
def blockingPool = ExecutionContext.fromExecutorService(es)
def blockingContext = ExecutionContext.fromExecutorService(es)
}
def fromExecutionContext[F[_]](ec: ExecutionContext): Linebacker[F] = new Linebacker[F] {
def blockingPool = ec
def blockingContext = ec
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.chrisdavenport.linebacker

import cats._
import cats.effect.Async
import cats.implicits._
import scala.concurrent.ExecutionContext
Expand All @@ -23,11 +22,8 @@ trait Quarterback[F[_], K] {
def fleaFlicker[A](fa: F[A], initial: K, end: K)(implicit F: Async[F]): F[A] =
for {
iEC <- select(initial)
_ <- Async.shift(iEC)
aE <- fa.attempt
eEC <- select(end)
_ <- Async.shift(eEC)
a <- Applicative[F].pure(aE).rethrow
endEC <- select(end)
a <- Async[F].bracket(Async.shift[F](iEC))(_ => fa)(_ => Async.shift(endEC))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is like the third time I see this. I think this is pointing to the need of either some package object or object definition of something like:

private[linebacker] final def dualShift[F[_], A](initialEc: ExecutionContext, endEc: ExecutionContext, action: F[A])(implicit F: Async[F]) = 
  F.bracket(Async.shift[F](initialEc))(_ => fa)(_ => Async.shift[F](encEc))

Then just calling dualShift(ec1, ec2, fa) in different modules and you save all the copypasta.

} yield a
}
object Quarterback {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package io.chrisdavenport.linebacker.contexts

import cats.effect.Sync
import cats.effect.{Resource, Sync}
import cats.implicits._
import fs2.Stream
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ExecutorService, Executors => E, ForkJoinPool, ThreadFactory}

Expand All @@ -18,16 +17,16 @@ object Executors {
* resource with the same number of threads.
*/
def fixedPool[F[_]: Sync](n: Int) =
streamExecutorService(fixedPoolExecutorUnsafe(n))
executorServiceResource(fixedPoolExecutorUnsafe(n))

/**
* Constructs an unbound thread pool that will create
* a new thread for each submitted job. This is useful
* if you have a construct that is blocking but
* self-manages the number of threads you can consume.
*/
def unbound[F[_]: Sync]: Stream[F, ExecutorService] =
streamExecutorService(unboundExecutorUnsafe)
def unbound[F[_]: Sync]: Resource[F, ExecutorService] =
executorServiceResource(unboundExecutorUnsafe)

/**
* A work stealing pool is often a useful blocking
Expand All @@ -39,20 +38,18 @@ object Executors {
* requests or other work.
*/
def workStealingPool[F[_]: Sync](n: Int) =
streamExecutorService(workStealingPoolUnsafe(n))
executorServiceResource(workStealingPoolUnsafe(n))

/**
* Default Pool For Scala, optimized for forked work and then returning to a
* main pool, generally ideal for your main event loop.
*/
def forkJoinPool[F[_]: Sync](n: Int) =
streamExecutorService(forkJoinPoolUnsafe(n))
executorServiceResource(forkJoinPoolUnsafe(n))

private def streamExecutorService[F[_]: Sync](f: F[ExecutorService]): Stream[F, ExecutorService] =
Stream.bracket(f)(
_.pure[Stream[F, ?]],
es => Sync[F].delay(es.shutdownNow).void
)
private def executorServiceResource[F[_]: Sync](
f: F[ExecutorService]): Resource[F, ExecutorService] =
Resource.make[F, ExecutorService](f)(es => Sync[F].delay(es.shutdownNow).void)

object unsafe {
def unboundExecutorUnsafe[F[_]: Sync]: F[ExecutorService] = Sync[F].delay {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package io.chrisdavenport.linebacker

import org.specs2._
import cats.effect.IO
import cats.effect._
import cats.implicits._
import fs2.Stream
import java.lang.Thread
import scala.concurrent.ExecutionContext
import java.util.concurrent.atomic.AtomicLong
Expand All @@ -17,17 +16,15 @@ class LinebackerSpec extends Spec {
"""

def runsOnLinebacker = {
E.unbound[IO]
val testRun = E
.unbound[IO]
.map(Linebacker.fromExecutorService[IO])
.flatMap { implicit linebacker =>
.use { implicit linebacker =>
import scala.concurrent.ExecutionContext.Implicits.global
Stream.eval(
Linebacker[IO].block(IO(Thread.currentThread().getName))
)
Linebacker[IO].blockShift(IO(Thread.currentThread().getName))
}
.compile
.last
.unsafeRunSync must_== Some("linebacker-thread-0")

testRun.unsafeRunSync must_=== "linebacker-thread-0"
}

def runsOffLinebackerAfterwards = {
Expand All @@ -46,14 +43,10 @@ class LinebackerSpec extends Spec {

implicit val linebacker = Linebacker.fromExecutionContext[IO](ec)

Stream
.eval(
Linebacker[IO].block(IO.unit) *>
IO(Thread.currentThread().getName)
<* IO(executor.shutdownNow)
)
.compile
.last
.unsafeRunSync must_== Some("test-ec-1")
val testRun = Linebacker[IO].blockShift(IO.unit) *>
IO(Thread.currentThread().getName) <*
IO(executor.shutdownNow)

testRun.unsafeRunSync must_=== "test-ec-1"
}
}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.1.5
sbt.version=1.1.6
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.1.1-SNAPSHOT"
version in ThisBuild := "0.2.0-SNAPSHOT"