Skip to content

Commit

Permalink
Merge pull request #19 from ChristopherDavenport/blockTimer
Browse files Browse the repository at this point in the history
Add blockTimer and rename to blockShift
  • Loading branch information
ChristopherDavenport authored May 31, 2018
2 parents 7d0a41b + ca5545e commit ce9225d
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 77 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ sudo: false
language: scala

scala:
- 2.12.4
- 2.12.6
- 2.11.12

jdk:
Expand All @@ -24,7 +24,7 @@ script:

after_success:
- test $TRAVIS_PULL_REQUEST == "false" && test $TRAVIS_BRANCH == "master" && test $TRAVIS_REPO_SLUG == "ChristopherDavenport/linebacker" && sbt ++$TRAVIS_SCALA_VERSION publish
- test $TRAVIS_PULL_REQUEST == "false" && test $TRAVIS_BRANCH == "master" && test $TRAVIS_REPO_SLUG == "ChristopherDavenport/linebacker" && test $TRAVIS_SCALA_VERSION == "2.12.4" && sbt microsite/publishMicrosite
- test $TRAVIS_PULL_REQUEST == "false" && test $TRAVIS_BRANCH == "master" && test $TRAVIS_REPO_SLUG == "ChristopherDavenport/linebacker" && test $TRAVIS_SCALA_VERSION == "2.12.6" && sbt microsite/publishMicrosite

cache:
directories:
Expand Down
21 changes: 9 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ First some imports

```tut:silent
import scala.concurrent.ExecutionContext.Implicits.global
import fs2.Stream
import cats.effect._
import cats.implicits._
import io.chrisdavenport.linebacker.Linebacker
Expand All @@ -29,20 +28,18 @@ Creating And Evaluating Pool Behavior
```tut
val getThread = IO(Thread.currentThread().getName)
object ThreadNameExample {
val checkRun = Executors.unbound[IO] // Create Executor
val checkRun = {
Executors.unbound[IO] // Create Executor
.map(Linebacker.fromExecutorService[IO](_)) // Create Linebacker From Executor
.flatMap { implicit linebacker => // Raise Implicitly
Stream.eval(
Linebacker[IO].block(getThread) // Block On Linebacker Pool Not Global
) ++
Stream.eval(getThread) // Running On Global
.use{ implicit linebacker => // Raise Implicitly
Linebacker[IO].blockEc(getThread) // Block On Linebacker Pool Not Global
.flatMap(threadName => IO(println(threadName))) >>
getThread // Running On Global
.flatMap(threadName => IO(println(threadName)))
}
.evalMap(threadName => IO(println(threadName)))
.compile
.drain
}
ThreadNameExample.checkRun.unsafeRunSync
checkRun.unsafeRunSync
```

Dual Contexts Are Also Very Useful
Expand Down
7 changes: 3 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ lazy val commonSettings = Seq(
organization := "io.chrisdavenport",
scalaVersion := "2.12.6",
crossScalaVersions := Seq(scalaVersion.value, "2.11.12"),
addCompilerPlugin("org.spire-math" % "kind-projector" % "0.9.6" cross CrossVersion.binary),
addCompilerPlugin("org.spire-math" % "kind-projector" % "0.9.7" cross CrossVersion.binary),
scalafmtOnCompile := true,
scalafmtTestOnCompile := true,
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % "0.10.1",
"co.fs2" %% "fs2-core" % "0.10.3",
"org.specs2" %% "specs2-core" % "4.0.3" % Test
"org.typelevel" %% "cats-effect" % "1.0.0-RC2-d7181dc",
"org.specs2" %% "specs2-core" % "4.2.0" % Test
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.chrisdavenport.linebacker

import cats._
import cats.implicits._
import cats.effect._
import java.util.concurrent.ExecutorService
import scala.concurrent.ExecutionContext
Expand All @@ -11,12 +10,7 @@ trait DualContext[F[_]] {
def defaultContext: ExecutionContext

def block[A](fa: F[A])(implicit F: Async[F]): F[A] =
for {
_ <- Async.shift(blockingContext)
aE <- fa.attempt
_ <- Async.shift(defaultContext)
a <- Applicative[F].pure(aE).rethrow
} yield a
dualShift(blockingContext, defaultContext, fa)
}

object DualContext {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
package io.chrisdavenport.linebacker

import cats._
import cats.implicits._
import cats.effect.Async
import cats.effect._
import java.util.concurrent.ExecutorService
import scala.concurrent.ExecutionContext

trait Linebacker[F[_]] {

def blockingPool: ExecutionContext
def blockingContext: ExecutionContext

/**
* Attempts to Run the Given `F[A]` on the blocking pool.
* Then shifts back to the given implicit execution context
* after the Async `F[A]` is evaluated.
*/
final def block[A](fa: F[A])(implicit F: Async[F], ec: ExecutionContext): F[A] =
for {
_ <- Async.shift(blockingPool)
eA <- fa.attempt
_ <- Async.shift(ec)
a <- Applicative[F].pure(eA).rethrow
} yield a
final def blockEc[A](fa: F[A])(implicit F: Async[F], ec: ExecutionContext): F[A] =
dualShift(blockingContext, ec, fa)

/**
* Attempts to Run the Given `F[A]` on the blocking pool.
* Then shifts back to the F for the timer.
*/
final def blockTimer[A](fa: F[A])(implicit F: Async[F], timer: Timer[F]): F[A] =
F.bracket(Async.shift[F](blockingContext))(_ => fa)(_ => timer.shift)
}

object Linebacker {
def apply[F[_]](implicit ev: Linebacker[F]): Linebacker[F] = ev

def fromExecutorService[F[_]](es: ExecutorService): Linebacker[F] = new Linebacker[F] {
def blockingPool = ExecutionContext.fromExecutorService(es)
def blockingContext = ExecutionContext.fromExecutorService(es)
}
def fromExecutionContext[F[_]](ec: ExecutionContext): Linebacker[F] = new Linebacker[F] {
def blockingPool = ec
def blockingContext = ec
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.chrisdavenport.linebacker

import cats._
import cats.effect.Async
import cats.implicits._
import scala.concurrent.ExecutionContext
Expand All @@ -23,11 +22,8 @@ trait Quarterback[F[_], K] {
def fleaFlicker[A](fa: F[A], initial: K, end: K)(implicit F: Async[F]): F[A] =
for {
iEC <- select(initial)
_ <- Async.shift(iEC)
aE <- fa.attempt
eEC <- select(end)
_ <- Async.shift(eEC)
a <- Applicative[F].pure(aE).rethrow
endEC <- select(end)
a <- dualShift(iEC, endEC, fa)
} yield a
}
object Quarterback {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package io.chrisdavenport.linebacker.contexts

import cats.effect.Sync
import cats.effect.{Resource, Sync}
import cats.implicits._
import fs2.Stream
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ExecutorService, Executors => E, ForkJoinPool, ThreadFactory}

Expand All @@ -18,16 +17,16 @@ object Executors {
* resource with the same number of threads.
*/
def fixedPool[F[_]: Sync](n: Int) =
streamExecutorService(fixedPoolExecutorUnsafe(n))
executorServiceResource(fixedPoolExecutorUnsafe(n))

/**
* Constructs an unbound thread pool that will create
* a new thread for each submitted job. This is useful
* if you have a construct that is blocking but
* self-manages the number of threads you can consume.
*/
def unbound[F[_]: Sync]: Stream[F, ExecutorService] =
streamExecutorService(unboundExecutorUnsafe)
def unbound[F[_]: Sync]: Resource[F, ExecutorService] =
executorServiceResource(unboundExecutorUnsafe)

/**
* A work stealing pool is often a useful blocking
Expand All @@ -39,20 +38,18 @@ object Executors {
* requests or other work.
*/
def workStealingPool[F[_]: Sync](n: Int) =
streamExecutorService(workStealingPoolUnsafe(n))
executorServiceResource(workStealingPoolUnsafe(n))

/**
* Default Pool For Scala, optimized for forked work and then returning to a
* main pool, generally ideal for your main event loop.
*/
def forkJoinPool[F[_]: Sync](n: Int) =
streamExecutorService(forkJoinPoolUnsafe(n))
executorServiceResource(forkJoinPoolUnsafe(n))

private def streamExecutorService[F[_]: Sync](f: F[ExecutorService]): Stream[F, ExecutorService] =
Stream.bracket(f)(
_.pure[Stream[F, ?]],
es => Sync[F].delay(es.shutdownNow).void
)
private def executorServiceResource[F[_]: Sync](
f: F[ExecutorService]): Resource[F, ExecutorService] =
Resource.make[F, ExecutorService](f)(es => Sync[F].delay(es.shutdownNow).void)

object unsafe {
def unboundExecutorUnsafe[F[_]: Sync]: F[ExecutorService] = Sync[F].delay {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.chrisdavenport

import cats.effect.Async
import scala.concurrent.ExecutionContext

package object linebacker {
private[linebacker] def dualShift[F[_]: Async, A](
initialEc: ExecutionContext,
endEc: ExecutionContext,
fa: F[A]) =
Async[F].bracket(Async.shift[F](initialEc))(_ => fa)(_ => Async.shift[F](endEc))

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package io.chrisdavenport.linebacker

import org.specs2._
import cats.effect.IO
import cats.effect._
import cats.implicits._
import fs2.Stream
import java.lang.Thread
import scala.concurrent.ExecutionContext
import java.util.concurrent.atomic.AtomicLong
Expand All @@ -17,17 +16,15 @@ class LinebackerSpec extends Spec {
"""

def runsOnLinebacker = {
E.unbound[IO]
val testRun = E
.unbound[IO]
.map(Linebacker.fromExecutorService[IO])
.flatMap { implicit linebacker =>
.use { implicit linebacker =>
import scala.concurrent.ExecutionContext.Implicits.global
Stream.eval(
Linebacker[IO].block(IO(Thread.currentThread().getName))
)
Linebacker[IO].blockEc(IO(Thread.currentThread().getName))
}
.compile
.last
.unsafeRunSync must_== Some("linebacker-thread-0")

testRun.unsafeRunSync must_=== "linebacker-thread-0"
}

def runsOffLinebackerAfterwards = {
Expand All @@ -46,14 +43,10 @@ class LinebackerSpec extends Spec {

implicit val linebacker = Linebacker.fromExecutionContext[IO](ec)

Stream
.eval(
Linebacker[IO].block(IO.unit) *>
IO(Thread.currentThread().getName)
<* IO(executor.shutdownNow)
)
.compile
.last
.unsafeRunSync must_== Some("test-ec-1")
val testRun = Linebacker[IO].blockEc(IO.unit) *>
IO(Thread.currentThread().getName) <*
IO(executor.shutdownNow)

testRun.unsafeRunSync must_=== "test-ec-1"
}
}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.1.5
sbt.version=1.1.6
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.1.1-SNAPSHOT"
version in ThisBuild := "0.2.0-SNAPSHOT"

0 comments on commit ce9225d

Please sign in to comment.