Skip to content

Commit

Permalink
Add parallel interpreter (#101)
Browse files Browse the repository at this point in the history
* Make metals happy

* Include metals files in gitignore

* .map .sequence => .traverse

* First cut -- ParallelInterpreter

* Update changelog

* Prove consistency with NaiveInterpreter

* Fix evaluation (thanks, tests!)

* scalafmt

* Add target monad to interpreter so ParallelInterpreter can extend it

Necessary for downstream use in e.g. geotrellis-server

* Add sleep expression and use it to test parallelism

* Don't know how to interpret sleep by default

* Make test look better

* ParallelInterpreter -> ConcurrentInterpreter

* Add actually parallel interpreter + tests
  • Loading branch information
jisantuc authored and moradology committed May 15, 2019
1 parent f92a30b commit 81197d4
Show file tree
Hide file tree
Showing 15 changed files with 844 additions and 12 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ nohup.out

site/
.metadata/

.metals
.bloop
1 change: 1 addition & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
version=2.0.0-RC4
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add README [#92](https://github.com/geotrellis/maml/pull/92)
- Add STRTA and migrate to CircleCI [#93](https://github.com/geotrellis/maml/pull/93)
- Add changelog and pull request template [#96](https://github.com/geotrellis/maml/pull/96)
- Added `ParallelInterpreter` [#101](https://github.com/geotrellis/maml/pull/101)

### Changed
- Fixed 2.12 compilation in tests [#95](https://github.com/geotrellis/maml/pull/95)
Expand Down
58 changes: 58 additions & 0 deletions jvm/src/main/scala/eval/ConcurrentInterpreter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.azavea.maml.eval

import com.azavea.maml.ast._
import com.azavea.maml.error._
import com.azavea.maml.eval.directive._

import cats._
import cats.implicits._
import cats.data.Validated._
import cats.data.{NonEmptyList => NEL, _}
import cats.effect.{Concurrent, Fiber}

import scala.reflect.ClassTag

class ConcurrentInterpreter[F[_]](directives: List[Directive])(
implicit Conc: Concurrent[F]
) extends Interpreter[F] {
def apply(exp: Expression): F[Interpreted[Result]] = {
val children = evalInF(exp)
val out = children map {
_.andThen({ childRes =>
instructions(exp, childRes)
})
}
out
}

def evalInF(expression: Expression): F[Interpreted[List[Result]]] = {
val fibsF: F[List[Fiber[F, Interpreted[Result]]]] =
expression.children traverse { expr =>
Conc.start(apply(expr))
}
fibsF flatMap { _.traverse { _.join } } map { _.sequence }
}

val fallbackDirective: Directive = {
case (exp, res) => Invalid(NEL.of(UnhandledCase(exp, exp.kind)))
}

def prependDirective(directive: Directive) =
new ConcurrentInterpreter[F](directive +: directives)

def appendDirective(directive: Directive) =
new ConcurrentInterpreter[F](directives :+ directive)

def instructions(
expression: Expression,
children: List[Result]
): Interpreted[Result] =
directives
.reduceLeft(_ orElse _)
.orElse(fallbackDirective)((expression, children))
}

object ConcurrentInterpreter {
def DEFAULT[T[_]: Concurrent] =
new ConcurrentInterpreter[T](NaiveInterpreter.DEFAULT.directives)
}
4 changes: 2 additions & 2 deletions jvm/src/main/scala/eval/Interpreter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import cats.data.{NonEmptyList => NEL, _}
import scala.reflect.ClassTag


trait Interpreter {
def apply(exp: Expression): Interpreted[Result]
trait Interpreter[F[_]] {
def apply(exp: Expression): F[Interpreted[Result]]
}

object Interpreter {
Expand Down
8 changes: 4 additions & 4 deletions jvm/src/main/scala/eval/NaiveInterpreter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import cats.data.{NonEmptyList => NEL, _}
import scala.reflect.ClassTag


case class NaiveInterpreter(directives: List[Directive]) extends Interpreter {
case class NaiveInterpreter(directives: List[Directive]) extends Interpreter[Id] {

def apply(exp: Expression): Interpreted[Result] = {
val children: Interpreted[List[Result]] = exp.children.map(apply).sequence
val children: Interpreted[List[Result]] = exp.children.traverse(apply)
children.andThen({ childRes => instructions(exp, childRes) })
}

def prependDirective(directive: Directive): Interpreter =
def prependDirective(directive: Directive): Interpreter[Id] =
NaiveInterpreter(directive +: directives)

def appendDirective(directive: Directive): Interpreter =
def appendDirective(directive: Directive): Interpreter[Id] =
NaiveInterpreter(directives :+ directive)

val fallbackDirective: Directive =
Expand Down
65 changes: 65 additions & 0 deletions jvm/src/main/scala/eval/ParallelInterpreter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.azavea.maml.eval

import com.azavea.maml.ast._
import com.azavea.maml.error._
import com.azavea.maml.eval.directive._

import cats._
import cats.implicits._
import cats.data.Validated._
import cats.data.{NonEmptyList => NEL, _}
import cats.effect.ContextShift

import scala.reflect.ClassTag

class ParallelInterpreter[F[_]: Monad, G[_]](directives: List[Directive])(
implicit Par: Parallel[F, G],
contextShift: ContextShift[F]
) extends Interpreter[F] {
def apply(exp: Expression): F[Interpreted[Result]] = {
val children = evalInF(exp)
val out = children map {
_.andThen({ childRes =>
instructions(exp, childRes)
})
}
out
}

def evalInF(
expression: Expression
)(implicit contextShift: ContextShift[F]): F[Interpreted[List[Result]]] = {
val resultsF: F[List[Interpreted[Result]]] =
expression.children parTraverse { expr =>
apply(expr)
}
resultsF map { _.sequence }
}

val fallbackDirective: Directive = {
case (exp, res) => Invalid(NEL.of(UnhandledCase(exp, exp.kind)))
}

def prependDirective(directive: Directive) =
new ParallelInterpreter[F, G](directive +: directives)

def appendDirective(directive: Directive) =
new ParallelInterpreter[F, G](directives :+ directive)

def instructions(
expression: Expression,
children: List[Result]
): Interpreted[Result] =
directives
.reduceLeft(_ orElse _)
.orElse(fallbackDirective)((expression, children))
}

object ParallelInterpreter {
def DEFAULT[T[_], U[_]](
implicit P: Parallel[T, U],
M: Monad[T],
contextShift: ContextShift[T]
) =
new ParallelInterpreter[T, U](NaiveInterpreter.DEFAULT.directives)
}
6 changes: 3 additions & 3 deletions jvm/src/main/scala/eval/ScopedInterpreter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import cats.data.{NonEmptyList => NEL, _}
import geotrellis.raster.GridBounds


trait ScopedInterpreter[Scope] extends Interpreter {
trait ScopedInterpreter[Scope] extends Interpreter[Id] {
def scopeFor(exp: Expression, previous: Option[Scope]): Scope
def appendDirective(directive: ScopedDirective[Scope]): ScopedInterpreter[Scope]
def prependDirective(directive: ScopedDirective[Scope]): ScopedInterpreter[Scope]
Expand All @@ -20,10 +20,10 @@ trait ScopedInterpreter[Scope] extends Interpreter {
def apply(exp: Expression): Interpreted[Result] = {
def eval(exp: Expression, maybeScope: Option[Scope] = None): Interpreted[Result] = {
val currentScope = scopeFor(exp, maybeScope)
val children: Interpreted[List[Result]] = exp.children.map({ childTree =>
val children: Interpreted[List[Result]] = exp.children.traverse({ childTree =>
val childScope = scopeFor(childTree, Some(currentScope))
eval(childTree, Some(childScope))
}).sequence
})
children.andThen({ childResult => instructions(exp, childResult, currentScope) })
}
eval(exp)
Expand Down
7 changes: 7 additions & 0 deletions jvm/src/main/scala/eval/directive/OpDirectives.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import Validated._
import geotrellis.vector._
import geotrellis.raster.{Tile, isData}

import scala.concurrent.duration._
import scala.util.Try


Expand Down Expand Up @@ -348,6 +349,12 @@ object OpDirectives {
Valid(results)
}

/** Sleeping */
val sleep = Directive { case (Sleep(n, _), childResults) =>
Thread.sleep(n * 1000)
Valid(childResults.head)
}

/** Tile-specific Operations */
val masking = Directive { case (mask@Masking(_), childResults) =>
((childResults(0), childResults(1)) match {
Expand Down
Loading

0 comments on commit 81197d4

Please sign in to comment.