Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blockTimer and rename to blockShift #19

Merged
merged 12 commits into from
May 31, 2018
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ sudo: false
language: scala

scala:
- 2.12.4
- 2.12.6
- 2.11.12

jdk:
Expand All @@ -24,7 +24,7 @@ script:

after_success:
- test $TRAVIS_PULL_REQUEST == "false" && test $TRAVIS_BRANCH == "master" && test $TRAVIS_REPO_SLUG == "ChristopherDavenport/linebacker" && sbt ++$TRAVIS_SCALA_VERSION publish
- test $TRAVIS_PULL_REQUEST == "false" && test $TRAVIS_BRANCH == "master" && test $TRAVIS_REPO_SLUG == "ChristopherDavenport/linebacker" && test $TRAVIS_SCALA_VERSION == "2.12.4" && sbt microsite/publishMicrosite
- test $TRAVIS_PULL_REQUEST == "false" && test $TRAVIS_BRANCH == "master" && test $TRAVIS_REPO_SLUG == "ChristopherDavenport/linebacker" && test $TRAVIS_SCALA_VERSION == "2.12.6" && sbt microsite/publishMicrosite

cache:
directories:
Expand Down
21 changes: 9 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ First some imports

```tut:silent
import scala.concurrent.ExecutionContext.Implicits.global
import fs2.Stream
import cats.effect._
import cats.implicits._
import io.chrisdavenport.linebacker.Linebacker
Expand All @@ -29,20 +28,18 @@ Creating And Evaluating Pool Behavior
```tut
val getThread = IO(Thread.currentThread().getName)

object ThreadNameExample {
val checkRun = Executors.unbound[IO] // Create Executor
val checkRun = {
Executors.unbound[IO] // Create Executor
.map(Linebacker.fromExecutorService[IO](_)) // Create Linebacker From Executor
.flatMap { implicit linebacker => // Raise Implicitly
Stream.eval(
Linebacker[IO].block(getThread) // Block On Linebacker Pool Not Global
) ++
Stream.eval(getThread) // Running On Global
.use{ implicit linebacker => // Raise Implicitly
Linebacker[IO].blockEc(getThread) // Block On Linebacker Pool Not Global
.flatMap(threadName => IO(println(threadName))) >>
getThread // Running On Global
.flatMap(threadName => IO(println(threadName)))
}
.evalMap(threadName => IO(println(threadName)))
.compile
.drain
}
ThreadNameExample.checkRun.unsafeRunSync

checkRun.unsafeRunSync
```

Dual Contexts Are Also Very Useful
Expand Down
7 changes: 3 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ lazy val commonSettings = Seq(
organization := "io.chrisdavenport",
scalaVersion := "2.12.6",
crossScalaVersions := Seq(scalaVersion.value, "2.11.12"),
addCompilerPlugin("org.spire-math" % "kind-projector" % "0.9.6" cross CrossVersion.binary),
addCompilerPlugin("org.spire-math" % "kind-projector" % "0.9.7" cross CrossVersion.binary),
scalafmtOnCompile := true,
scalafmtTestOnCompile := true,
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % "0.10.1",
"co.fs2" %% "fs2-core" % "0.10.3",
"org.specs2" %% "specs2-core" % "4.0.3" % Test
"org.typelevel" %% "cats-effect" % "1.0.0-RC2-d7181dc",
"org.specs2" %% "specs2-core" % "4.2.0" % Test
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.chrisdavenport.linebacker

import cats._
import cats.implicits._
import cats.effect._
import java.util.concurrent.ExecutorService
import scala.concurrent.ExecutionContext
Expand All @@ -11,12 +10,7 @@ trait DualContext[F[_]] {
def defaultContext: ExecutionContext

def block[A](fa: F[A])(implicit F: Async[F]): F[A] =
for {
_ <- Async.shift(blockingContext)
aE <- fa.attempt
_ <- Async.shift(defaultContext)
a <- Applicative[F].pure(aE).rethrow
} yield a
dualShift(blockingContext, defaultContext, fa)
}

object DualContext {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
package io.chrisdavenport.linebacker

import cats._
import cats.implicits._
import cats.effect.Async
import cats.effect._
import java.util.concurrent.ExecutorService
import scala.concurrent.ExecutionContext

trait Linebacker[F[_]] {

def blockingPool: ExecutionContext
def blockingContext: ExecutionContext

/**
* Attempts to Run the Given `F[A]` on the blocking pool.
* Then shifts back to the given implicit execution context
* after the Async `F[A]` is evaluated.
*/
final def block[A](fa: F[A])(implicit F: Async[F], ec: ExecutionContext): F[A] =
for {
_ <- Async.shift(blockingPool)
eA <- fa.attempt
_ <- Async.shift(ec)
a <- Applicative[F].pure(eA).rethrow
} yield a
final def blockEc[A](fa: F[A])(implicit F: Async[F], ec: ExecutionContext): F[A] =
dualShift(blockingContext, ec, fa)

/**
* Attempts to Run the Given `F[A]` on the blocking pool.
* Then shifts back to the F for the timer.
*/
final def blockTimer[A](fa: F[A])(implicit F: Async[F], timer: Timer[F]): F[A] =
F.bracket(Async.shift[F](blockingContext))(_ => fa)(_ => timer.shift)
}

object Linebacker {
def apply[F[_]](implicit ev: Linebacker[F]): Linebacker[F] = ev

def fromExecutorService[F[_]](es: ExecutorService): Linebacker[F] = new Linebacker[F] {
def blockingPool = ExecutionContext.fromExecutorService(es)
def blockingContext = ExecutionContext.fromExecutorService(es)
}
def fromExecutionContext[F[_]](ec: ExecutionContext): Linebacker[F] = new Linebacker[F] {
def blockingPool = ec
def blockingContext = ec
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.chrisdavenport.linebacker

import cats._
import cats.effect.Async
import cats.implicits._
import scala.concurrent.ExecutionContext
Expand All @@ -23,11 +22,8 @@ trait Quarterback[F[_], K] {
def fleaFlicker[A](fa: F[A], initial: K, end: K)(implicit F: Async[F]): F[A] =
for {
iEC <- select(initial)
_ <- Async.shift(iEC)
aE <- fa.attempt
eEC <- select(end)
_ <- Async.shift(eEC)
a <- Applicative[F].pure(aE).rethrow
endEC <- select(end)
a <- dualShift(iEC, endEC, fa)
} yield a
}
object Quarterback {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package io.chrisdavenport.linebacker.contexts

import cats.effect.Sync
import cats.effect.{Resource, Sync}
import cats.implicits._
import fs2.Stream
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ExecutorService, Executors => E, ForkJoinPool, ThreadFactory}

Expand All @@ -18,16 +17,16 @@ object Executors {
* resource with the same number of threads.
*/
def fixedPool[F[_]: Sync](n: Int) =
streamExecutorService(fixedPoolExecutorUnsafe(n))
executorServiceResource(fixedPoolExecutorUnsafe(n))

/**
* Constructs an unbound thread pool that will create
* a new thread for each submitted job. This is useful
* if you have a construct that is blocking but
* self-manages the number of threads you can consume.
*/
def unbound[F[_]: Sync]: Stream[F, ExecutorService] =
streamExecutorService(unboundExecutorUnsafe)
def unbound[F[_]: Sync]: Resource[F, ExecutorService] =
executorServiceResource(unboundExecutorUnsafe)

/**
* A work stealing pool is often a useful blocking
Expand All @@ -39,20 +38,18 @@ object Executors {
* requests or other work.
*/
def workStealingPool[F[_]: Sync](n: Int) =
streamExecutorService(workStealingPoolUnsafe(n))
executorServiceResource(workStealingPoolUnsafe(n))

/**
* Default Pool For Scala, optimized for forked work and then returning to a
* main pool, generally ideal for your main event loop.
*/
def forkJoinPool[F[_]: Sync](n: Int) =
streamExecutorService(forkJoinPoolUnsafe(n))
executorServiceResource(forkJoinPoolUnsafe(n))

private def streamExecutorService[F[_]: Sync](f: F[ExecutorService]): Stream[F, ExecutorService] =
Stream.bracket(f)(
_.pure[Stream[F, ?]],
es => Sync[F].delay(es.shutdownNow).void
)
private def executorServiceResource[F[_]: Sync](
f: F[ExecutorService]): Resource[F, ExecutorService] =
Resource.make[F, ExecutorService](f)(es => Sync[F].delay(es.shutdownNow).void)

object unsafe {
def unboundExecutorUnsafe[F[_]: Sync]: F[ExecutorService] = Sync[F].delay {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.chrisdavenport

import cats.effect.Async
import scala.concurrent.ExecutionContext

package object linebacker {
private[linebacker] def dualShift[F[_]: Async, A](
initialEc: ExecutionContext,
endEc: ExecutionContext,
fa: F[A]) =
Async[F].bracket(Async.shift[F](initialEc))(_ => fa)(_ => Async.shift[F](endEc))

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package io.chrisdavenport.linebacker

import org.specs2._
import cats.effect.IO
import cats.effect._
import cats.implicits._
import fs2.Stream
import java.lang.Thread
import scala.concurrent.ExecutionContext
import java.util.concurrent.atomic.AtomicLong
Expand All @@ -17,17 +16,15 @@ class LinebackerSpec extends Spec {
"""

def runsOnLinebacker = {
E.unbound[IO]
val testRun = E
.unbound[IO]
.map(Linebacker.fromExecutorService[IO])
.flatMap { implicit linebacker =>
.use { implicit linebacker =>
import scala.concurrent.ExecutionContext.Implicits.global
Stream.eval(
Linebacker[IO].block(IO(Thread.currentThread().getName))
)
Linebacker[IO].blockEc(IO(Thread.currentThread().getName))
}
.compile
.last
.unsafeRunSync must_== Some("linebacker-thread-0")

testRun.unsafeRunSync must_=== "linebacker-thread-0"
}

def runsOffLinebackerAfterwards = {
Expand All @@ -46,14 +43,10 @@ class LinebackerSpec extends Spec {

implicit val linebacker = Linebacker.fromExecutionContext[IO](ec)

Stream
.eval(
Linebacker[IO].block(IO.unit) *>
IO(Thread.currentThread().getName)
<* IO(executor.shutdownNow)
)
.compile
.last
.unsafeRunSync must_== Some("test-ec-1")
val testRun = Linebacker[IO].blockEc(IO.unit) *>
IO(Thread.currentThread().getName) <*
IO(executor.shutdownNow)

testRun.unsafeRunSync must_=== "test-ec-1"
}
}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.1.5
sbt.version=1.1.6
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.1.1-SNAPSHOT"
version in ThisBuild := "0.2.0-SNAPSHOT"