Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cats effect and simplify internals using HList directly #9

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .scalafix.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ rules = [
ProcedureSyntax
]
DisableSyntax.noVars = true
DisableSyntax.noThrows = true
//DisableSyntax.noThrows = true
DisableSyntax.noNulls = true
DisableSyntax.noReturns = true
DisableSyntax.noAsInstanceOf = true
Expand Down
22 changes: 10 additions & 12 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import microsites.ExtraMdFileConfig
import org.scalajs.sbtplugin.ScalaJSCrossVersion
import sbtcrossproject.CrossPlugin.autoImport.{crossProject, CrossType}

name := "Orkestra"
ThisBuild / organization := "tech.orkestra"
ThisBuild / licenses += "APL2" -> url("http://www.apache.org/licenses/LICENSE-2.0")
ThisBuild / homepage := Option(url("https://orkestra.tech"))
Expand All @@ -20,15 +21,11 @@ Global / releaseEarlyEnableLocalReleases := true
ThisBuild / scalacOptions ++= Seq(
"-deprecation",
"-feature",
"-language:higherKinds",
"-Xlint:unsound-match",
"-Yrangepos",
"-Ywarn-inaccessible",
"-Ywarn-infer-any",
"-Ywarn-unused:imports",
"-Ywarn-unused:locals",
"-Ywarn-unused:patvars",
"-Ywarn-unused:privates",
"-language:higherKinds",
"-Ywarn-unused",
"-Ypartial-unification",
"-Ywarn-dead-code"
)
Expand Down Expand Up @@ -58,8 +55,7 @@ lazy val `orkestra-core` = crossProject(JVMPlatform, JSPlatform)
"com.chuusai" %%% "shapeless" % "2.3.3",
"com.vmunier" %% "scalajs-scripts" % "1.1.2",
"com.lihaoyi" %%% "autowire" % "0.2.6",
"com.goyeau" %% "kubernetes-client" % "0.0.5",
"org.typelevel" %% "cats-effect" % "1.0.0",
"com.goyeau" %% "kubernetes-client" % "0.3.0",
"org.scala-lang" % "scala-reflect" % scalaVersion.value
) ++
scalaJsReact.value ++
Expand Down Expand Up @@ -140,6 +136,7 @@ lazy val `orkestra-integration-tests` = crossProject(JVMPlatform, JSPlatform)
version ~= (_.replace('+', '-')),
buildInfoPackage := s"${organization.value}.integration.tests",
buildInfoKeys += "artifactName" -> artifact.value.name,
scalaJSUseMainModuleInitializer := true,
libraryDependencies ++= scalaTest.value,
publishArtifact := false,
publishLocal := {}
Expand Down Expand Up @@ -173,7 +170,7 @@ lazy val akkaHttpCirce = Def.setting {
}

lazy val circe = Def.setting {
val version = "0.9.3"
val version = "0.10.1"
Seq(
"io.circe" %%% "circe-core" % version,
"io.circe" %%% "circe-generic" % version,
Expand All @@ -199,15 +196,15 @@ lazy val scalaCss = Def.setting {
}

lazy val scalaJsReact = Def.setting {
val scalaJsReactVersion = "1.2.3"
val scalaJsReactVersion = "1.3.1"
Seq(
("com.github.japgolly.scalajs-react" % "core" % scalaJsReactVersion).cross(ScalaJSCrossVersion.binary),
("com.github.japgolly.scalajs-react" % "extra" % scalaJsReactVersion).cross(ScalaJSCrossVersion.binary)
)
}

lazy val react = Def.setting {
val reactVersion = "16.2.0"
val reactVersion = "16.5.1"
Seq(
("org.webjars.npm" % "react" % reactVersion / "umd/react.development.js")
.minified("umd/react.production.min.js")
Expand All @@ -224,10 +221,11 @@ lazy val react = Def.setting {
}

lazy val elastic4s = Def.setting {
val elastic4sVersion = "6.2.3"
val elastic4sVersion = "6.5.0"
Seq(
"com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion,
"com.sksamuel.elastic4s" %% "elastic4s-http-streams" % elastic4sVersion,
"com.sksamuel.elastic4s" %% "elastic4s-cats-effect" % elastic4sVersion,
"com.sksamuel.elastic4s" %% "elastic4s-circe" % elastic4sVersion,
"com.sksamuel.elastic4s" %% "elastic4s-testkit" % elastic4sVersion % Test
)
Expand Down
4 changes: 2 additions & 2 deletions docs/src/main/tut/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ spec:
targetPort: 9300

---
apiVersion: apps/v1beta2
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: elasticsearch
Expand Down Expand Up @@ -141,7 +141,7 @@ spec:
targetPort: 8081

---
apiVersion: apps/v1beta2
apiVersion: apps/v1
kind: Deployment
metadata:
name: orkestra
Expand Down
2 changes: 1 addition & 1 deletion examples/kubernetes-dev/1-elasticsearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ spec:
targetPort: 9300

---
apiVersion: apps/v1beta2
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: elasticsearch
Expand Down
2 changes: 1 addition & 1 deletion examples/kubernetes-dev/2-orchestra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ spec:
targetPort: 8081

---
apiVersion: apps/v1beta2
apiVersion: apps/v1
kind: Deployment
metadata:
name: orkestra
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package tech.orkestra

import cats.effect.{ExitCode, IO, IOApp}
import tech.orkestra.board.Board
import tech.orkestra.css.AppCss
import tech.orkestra.route.WebRouter
import com.goyeau.kubernetes.client.KubernetesClient
import com.sksamuel.elastic4s.http.HttpClient
import com.sksamuel.elastic4s.http.ElasticClient
import org.scalajs.dom

/**
* Mix in this trait to create the Orkestra server.
*/
trait OrkestraServer extends OrkestraPlugin {
trait OrkestraServer extends IOApp with OrkestraPlugin[IO] {
implicit override def orkestraConfig: OrkestraConfig = ???
implicit override def kubernetesClient: KubernetesClient = ???
implicit override def elasticsearchClient: HttpClient = ???
implicit override def elasticsearchClient: ElasticClient = ???

def board: Board

def main(args: Array[String]): Unit = {
def run(args: List[String]): IO[ExitCode] = IO {
AppCss.load()
WebRouter.router(board).renderIntoDOM(dom.document.getElementById(BuildInfo.projectName.toLowerCase))
ExitCode.Success
}
}
Original file line number Diff line number Diff line change
@@ -1,38 +1,37 @@
package tech.orkestra

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives.{entity, _}
import autowire.Core
import cats.Applicative
import cats.effect.{ExitCode, IO, IOApp}
import com.goyeau.kubernetes.client.KubernetesClient
import com.sksamuel.elastic4s.http.HttpClient
import com.sksamuel.elastic4s.http.ElasticClient
import com.typesafe.scalalogging.Logger
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import io.circe.Json
import io.circe.generic.auto._
import io.circe.shapes._
import io.circe.java8.time._
import scalajs.html.scripts
import tech.orkestra.utils.AkkaImplicits._
import tech.orkestra.job.Job
import tech.orkestra.kubernetes.Kubernetes
import tech.orkestra.utils.{AutowireServer, Elasticsearch}
import scalajs.html.scripts

/**
* Mix in this trait to create the Orkestra job server.
*/
trait OrkestraServer extends OrkestraPlugin {
trait OrkestraServer extends IOApp with OrkestraPlugin[IO] {
private lazy val logger = Logger(getClass)
override lazy val F: Applicative[IO] = implicitly[Applicative[IO]]
implicit override lazy val orkestraConfig: OrkestraConfig = OrkestraConfig.fromEnvVars()
implicit override lazy val kubernetesClient: KubernetesClient = Kubernetes.client
implicit override lazy val elasticsearchClient: HttpClient = Elasticsearch.client
implicit override lazy val elasticsearchClient: ElasticClient = Elasticsearch.client

def jobs: Set[Job[_, _]]
def jobs: Set[Job[IO, _, _]]

lazy val routes =
def routes(implicit kubernetesClient: KubernetesClient[IO]) =
pathPrefix("assets" / Remaining) { file =>
encodeResponse {
getFromResource(s"public/$file")
Expand All @@ -55,7 +54,7 @@ trait OrkestraServer extends OrkestraPlugin {
|${scripts(
"web",
name => s"/assets/$name",
name => getClass.getResource(s"/public/$name") != null
name => Option(getClass.getResource(s"/public/$name")).isDefined
).body}
|</body>
|</html>
Expand All @@ -70,32 +69,30 @@ trait OrkestraServer extends OrkestraPlugin {
path(OrkestraConfig.commonSegment / Segments) { segments =>
entity(as[Json]) { json =>
val body = AutowireServer.read[Map[String, Json]](json)
val request = AutowireServer.route[CommonApi](CommonApiServer())(Core.Request(segments, body))
val request = AutowireServer.route[CommonApi](CommonApiServer[IO]())(Core.Request(segments, body))
onSuccess(request)(json => complete(json))
}
}
}

def main(args: Array[String]): Unit =
Await.result(
orkestraConfig.runInfoMaybe.fold {
for {
_ <- Future(logger.info("Initializing Elasticsearch"))
_ <- Elasticsearch.init()
_ = logger.info("Starting master Orkestra")
_ <- onMasterStart()
_ <- Http().bindAndHandle(routes, "0.0.0.0", orkestraConfig.port)
} yield ()
} { runInfo =>
for {
_ <- Future(logger.info(s"Running job $runInfo"))
_ <- onJobStart(runInfo)
_ <- jobs
.find(_.board.id == runInfo.jobId)
.getOrElse(throw new IllegalArgumentException(s"No job found for id ${runInfo.jobId}"))
.start(runInfo)
} yield ()
},
Duration.Inf
)
def run(args: List[String]): IO[ExitCode] = Kubernetes.client[IO].use { implicit kubernetesClient =>
orkestraConfig.runInfoMaybe.fold {
for {
_ <- IO.pure(logger.info("Initializing Elasticsearch"))
_ <- Elasticsearch.init[IO]
_ = logger.info("Starting master Orkestra")
_ <- onMasterStart(kubernetesClient)
_ <- IO.fromFuture(IO(Http().bindAndHandle(routes, "0.0.0.0", orkestraConfig.port)))
} yield ExitCode.Success
} { runInfo =>
for {
_ <- IO.delay(logger.info(s"Running job $runInfo"))
_ <- onJobStart(runInfo)
_ <- jobs
.find(_.board.id == runInfo.jobId)
.getOrElse(throw new IllegalArgumentException(s"No job found for id ${runInfo.jobId}"))
.start(runInfo)
} yield ExitCode.Success
}
}
}
67 changes: 38 additions & 29 deletions orkestra-core/src/main/scala/tech/orkestra/CommonApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@ package tech.orkestra
import java.io.IOException
import java.time.Instant

import scala.concurrent.Future
import cats.Applicative
import cats.effect.ConcurrentEffect
import cats.implicits._

import scala.concurrent.Future
import com.goyeau.kubernetes.client.KubernetesClient
import com.sksamuel.elastic4s.cats.effect.instances._
import com.sksamuel.elastic4s.circe._
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.http.HttpClient
import com.sksamuel.elastic4s.http.ElasticClient
import com.sksamuel.elastic4s.searches.sort.SortOrder
import io.circe.generic.auto._
import io.circe.java8.time._
import io.circe.shapes._
import shapeless.HNil

import tech.orkestra.model.Indexed._
import tech.orkestra.model.{Page, RunId, RunInfo}
import tech.orkestra.utils.AutowireClient
Expand All @@ -28,12 +31,12 @@ object CommonApi {
val client = AutowireClient(OrkestraConfig.commonSegment)[CommonApi]
}

case class CommonApiServer()(
implicit orkestraConfig: OrkestraConfig,
kubernetesClient: KubernetesClient,
elasticsearchClient: HttpClient
case class CommonApiServer[F[_]: ConcurrentEffect]()(
implicit
orkestraConfig: OrkestraConfig,
kubernetesClient: KubernetesClient[F],
elasticsearchClient: ElasticClient
) extends CommonApi {
import tech.orkestra.utils.AkkaImplicits._

override def logs(runId: RunId, page: Page[(Instant, Int)]): Future[Seq[LogLine]] =
elasticsearchClient
Expand All @@ -56,29 +59,35 @@ case class CommonApiServer()(
)
.size(math.abs(page.size))
)
.map(_.fold(failure => throw new IOException(failure.error.reason), identity).result.to[LogLine])
.map(response => response.fold(throw new IOException(response.error.reason))(_.to[LogLine]))
.unsafeToFuture()

override def runningJobs(): Future[Seq[Run[HNil, Unit]]] =
for {
runInfos <- kubernetesClient.jobs
.namespace(orkestraConfig.namespace)
.list()
.map(_.items.map(RunInfo.fromKubeJob))
ConcurrentEffect[F]
.toIO {
for {
runInfo <- kubernetesClient.jobs
.namespace(orkestraConfig.namespace)
.list
.map(_.items.map(RunInfo.fromKubeJob))

runs <- if (runInfos.nonEmpty)
elasticsearchClient
.execute(
search(HistoryIndex.index)
.query(
boolQuery.filter(
termsQuery("runInfo.runId", runInfos.map(_.runId.value)),
termsQuery("runInfo.jobId", runInfos.map(_.jobId.value))
)
runs <- if (runInfo.nonEmpty)
elasticsearchClient
.execute(
search(HistoryIndex.index)
.query(
boolQuery.filter(
termsQuery("runInfo.runId", runInfo.map(_.runId.value)),
termsQuery("runInfo.jobId", runInfo.map(_.jobId.value))
)
)
.sortBy(fieldSort("triggeredOn").desc(), fieldSort("_id").desc())
.size(1000)
)
.sortBy(fieldSort("triggeredOn").desc(), fieldSort("_id").desc())
.size(1000)
)
.map(_.fold(failure => throw new IOException(failure.error.reason), identity).result.to[Run[HNil, Unit]])
else Future.successful(Seq.empty)
} yield runs
.map(response => response.fold(throw new IOException(response.error.reason))(_.to[Run[HNil, Unit]]))
.to[F]
else Applicative[F].pure(IndexedSeq.empty[Run[HNil, Unit]])
} yield runs
}
.unsafeToFuture()
}
Loading