Skip to content

Commit

Permalink
Upgrade Kubernetes client and use more cats effect
Browse files Browse the repository at this point in the history
  • Loading branch information
joan38 committed Nov 26, 2018
1 parent d35a0b0 commit 8c7f985
Show file tree
Hide file tree
Showing 43 changed files with 1,198 additions and 1,191 deletions.
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ lazy val `orkestra-core` = crossProject(JVMPlatform, JSPlatform)
"com.chuusai" %%% "shapeless" % "2.3.3",
"com.vmunier" %% "scalajs-scripts" % "1.1.2",
"com.lihaoyi" %%% "autowire" % "0.2.6",
"com.goyeau" %% "kubernetes-client" % "0.0.5",
"com.goyeau" %% "kubernetes-client" % "0.3.0",
"org.typelevel" %% "cats-effect" % "1.0.0",
"org.scala-lang" % "scala-reflect" % scalaVersion.value
) ++
scalaJsReact.value ++
akkaHttp.value ++
akkaHttpCirce.value ++
circe.value ++
cats.value ++
scalaCss.value ++
logging.value ++
elastic4s.value ++
Expand Down Expand Up @@ -140,6 +141,7 @@ lazy val `orkestra-integration-tests` = crossProject(JVMPlatform, JSPlatform)
version ~= (_.replace('+', '-')),
buildInfoPackage := s"${organization.value}.integration.tests",
buildInfoKeys += "artifactName" -> artifact.value.name,
scalaJSUseMainModuleInitializer := true,
libraryDependencies ++= scalaTest.value,
publishArtifact := false,
publishLocal := {}
Expand Down Expand Up @@ -183,6 +185,10 @@ lazy val circe = Def.setting {
)
}

lazy val cats = Def.setting {
Seq("org.typelevel" %%% "cats-effect" % "1.0.0")
}

lazy val logging = Def.setting {
Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.8.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package tech.orkestra

import cats.effect.{ExitCode, IO, IOApp}
import tech.orkestra.board.Board
import tech.orkestra.css.AppCss
import tech.orkestra.route.WebRouter
import com.goyeau.kubernetes.client.KubernetesClient
import com.sksamuel.elastic4s.http.ElasticClient
import org.scalajs.dom

/**
* Mix in this trait to create the Orkestra server.
*/
trait OrkestraServer[F[_]] extends OrkestraPlugin {
trait OrkestraServer extends IOApp with OrkestraPlugin[IO] {
implicit override def orkestraConfig: OrkestraConfig = ???
implicit override def kubernetesClient: KubernetesClient = ???
implicit override def elasticsearchClient: ElasticClient = ???

def board: Board

def main(args: Array[String]): Unit = {
def run(args: List[String]): IO[ExitCode] = IO {
AppCss.load()
WebRouter.router(board).renderIntoDOM(dom.document.getElementById(BuildInfo.projectName.toLowerCase))
ExitCode.Success
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package tech.orkestra

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives.{entity, _}
import autowire.Core
import cats.Applicative
import cats.effect.{ExitCode, IO, IOApp}
import com.goyeau.kubernetes.client.KubernetesClient
import com.sksamuel.elastic4s.http.ElasticClient
import com.typesafe.scalalogging.Logger
Expand All @@ -14,24 +14,24 @@ import io.circe.Json
import io.circe.generic.auto._
import io.circe.shapes._
import io.circe.java8.time._
import scalajs.html.scripts
import tech.orkestra.utils.AkkaImplicits._
import tech.orkestra.job.Job
import tech.orkestra.kubernetes.Kubernetes
import tech.orkestra.utils.{AutowireServer, Elasticsearch}
import scalajs.html.scripts

/**
* Mix in this trait to create the Orkestra job server.
*/
trait OrkestraServer[F[_]] extends OrkestraPlugin {
trait OrkestraServer extends IOApp with OrkestraPlugin[IO] {
private lazy val logger = Logger(getClass)
override lazy val F: Applicative[IO] = implicitly[Applicative[IO]]
implicit override lazy val orkestraConfig: OrkestraConfig = OrkestraConfig.fromEnvVars()
implicit override lazy val kubernetesClient: KubernetesClient = Kubernetes.client
implicit override lazy val elasticsearchClient: ElasticClient = Elasticsearch.client

def jobs: Set[Job[F, _, _]]
def jobs: Set[Job[IO, _, _]]

lazy val routes =
def routes(implicit kubernetesClient: KubernetesClient[IO]) =
pathPrefix("assets" / Remaining) { file =>
encodeResponse {
getFromResource(s"public/$file")
Expand Down Expand Up @@ -69,32 +69,30 @@ trait OrkestraServer[F[_]] extends OrkestraPlugin {
path(OrkestraConfig.commonSegment / Segments) { segments =>
entity(as[Json]) { json =>
val body = AutowireServer.read[Map[String, Json]](json)
val request = AutowireServer.route[CommonApi](CommonApiServer())(Core.Request(segments, body))
val request = AutowireServer.route[CommonApi](CommonApiServer[IO]())(Core.Request(segments, body))
onSuccess(request)(json => complete(json))
}
}
}

def main(args: Array[String]): Unit =
Await.result(
orkestraConfig.runInfoMaybe.fold {
for {
_ <- Future(logger.info("Initializing Elasticsearch"))
_ <- Elasticsearch.init()
_ = logger.info("Starting master Orkestra")
_ <- onMasterStart()
_ <- Http().bindAndHandle(routes, "0.0.0.0", orkestraConfig.port)
} yield ()
} { runInfo =>
for {
_ <- Future(logger.info(s"Running job $runInfo"))
_ <- onJobStart(runInfo)
_ <- jobs
.find(_.board.id == runInfo.jobId)
.getOrElse(throw new IllegalArgumentException(s"No job found for id ${runInfo.jobId}"))
.start(runInfo)
} yield ()
},
Duration.Inf
)
def run(args: List[String]): IO[ExitCode] = Kubernetes.client[IO].use { implicit kubernetesClient =>
orkestraConfig.runInfoMaybe.fold {
for {
_ <- IO.pure(logger.info("Initializing Elasticsearch"))
_ <- Elasticsearch.init[IO]
_ = logger.info("Starting master Orkestra")
_ <- onMasterStart(kubernetesClient)
_ <- IO.fromFuture(IO(Http().bindAndHandle(routes, "0.0.0.0", orkestraConfig.port)))
} yield ExitCode.Success
} { runInfo =>
for {
_ <- IO.delay(logger.info(s"Running job $runInfo"))
_ <- onJobStart(runInfo)
_ <- jobs
.find(_.board.id == runInfo.jobId)
.getOrElse(throw new IllegalArgumentException(s"No job found for id ${runInfo.jobId}"))
.start(runInfo)
} yield ExitCode.Success
}
}
}
19 changes: 13 additions & 6 deletions orkestra-core/src/main/scala/tech/orkestra/CommonApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package tech.orkestra
import java.io.IOException
import java.time.Instant

import cats.effect.ConcurrentEffect
import cats.implicits._

import scala.concurrent.Future
import com.goyeau.kubernetes.client.KubernetesClient
import com.sksamuel.elastic4s.circe._
Expand All @@ -26,9 +29,9 @@ object CommonApi {
val client = AutowireClient(OrkestraConfig.commonSegment)[CommonApi]
}

case class CommonApiServer()(
case class CommonApiServer[F[_]: ConcurrentEffect]()(
implicit orkestraConfig: OrkestraConfig,
kubernetesClient: KubernetesClient,
kubernetesClient: KubernetesClient[F],
elasticsearchClient: ElasticClient
) extends CommonApi {
import tech.orkestra.utils.AkkaImplicits._
Expand Down Expand Up @@ -58,10 +61,14 @@ case class CommonApiServer()(

override def runningJobs(): Future[Seq[Run[HNil, Unit]]] =
for {
runInfos <- kubernetesClient.jobs
.namespace(orkestraConfig.namespace)
.list()
.map(_.items.map(RunInfo.fromKubeJob))
runInfos <- ConcurrentEffect[F]
.toIO(
kubernetesClient.jobs
.namespace(orkestraConfig.namespace)
.list
.map(_.items.map(RunInfo.fromKubeJob))
)
.unsafeToFuture()

runs <- if (runInfos.nonEmpty)
elasticsearchClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import scala.io.Source
import com.sksamuel.elastic4s.http.ElasticProperties
import io.circe.generic.auto._
import io.circe.parser._
import org.http4s.Uri
import tech.orkestra.model.{EnvRunInfo, RunId, RunInfo}

case class OrkestraConfig(
elasticsearchProperties: ElasticProperties,
workspace: String = OrkestraConfig.defaultWorkspace,
port: Int = OrkestraConfig.defaultBindPort,
runInfoMaybe: Option[RunInfo] = None,
kubeUri: String,
kubeUri: Uri,
namespace: String,
podName: String,
basePath: String = OrkestraConfig.defaultBasePath
Expand All @@ -34,7 +35,8 @@ object OrkestraConfig {
fromEnvVar("RUN_INFO").map { runInfoJson =>
decode[EnvRunInfo](runInfoJson).fold(throw _, runInfo => RunInfo(runInfo.jobId, runInfo.runId.getOrElse(jobUid)))
},
fromEnvVar("KUBE_URI").getOrElse(throw new IllegalStateException("ORKESTRA_KUBE_URI should be set")),
fromEnvVar("KUBE_URI")
.fold(throw new IllegalStateException("ORKESTRA_KUBE_URI should be set"))(Uri.unsafeFromString),
fromEnvVar("NAMESPACE").getOrElse(throw new IllegalStateException("ORKESTRA_NAMESPACE should be set")),
fromEnvVar("POD_NAME").getOrElse(throw new IllegalStateException("ORKESTRA_POD_NAME should be set")),
fromEnvVar("BASEPATH").getOrElse(defaultBasePath)
Expand Down
10 changes: 5 additions & 5 deletions orkestra-core/src/main/scala/tech/orkestra/OrkestraPlugin.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package tech.orkestra

import scala.concurrent.Future
import cats.Applicative
import com.goyeau.kubernetes.client.KubernetesClient
import com.sksamuel.elastic4s.http.ElasticClient
import tech.orkestra.model.RunInfo

trait OrkestraPlugin {
trait OrkestraPlugin[F[_]] {
implicit protected def F: Applicative[F]
implicit protected def orkestraConfig: OrkestraConfig
implicit protected def kubernetesClient: KubernetesClient
implicit protected def elasticsearchClient: ElasticClient

def onMasterStart(): Future[Unit] = Future.unit
def onJobStart(runInfo: RunInfo): Future[Unit] = Future.unit
def onMasterStart(kubernetesClient: KubernetesClient[F]): F[Unit] = Applicative[F].unit
def onJobStart(runInfo: RunInfo): F[Unit] = Applicative[F].unit
}
8 changes: 6 additions & 2 deletions orkestra-core/src/main/scala/tech/orkestra/input/Input.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ case class Text[T: Encoder: Decoder](name: String, default: Option[T] = None) ex
<.span(name),
<.input.text(
^.key := id.name,
^.value := state.get(id).map(_.asInstanceOf[T]) // scalafix:ok
^.value := state
.get(id)
.map(_.asInstanceOf[T]) // scalafix:ok
.orElse(default)
.fold("")(implicitly[Encoder[T]].apply(_)),
^.onChange ==> modValue
Expand Down Expand Up @@ -88,7 +90,9 @@ case class Select[Entry <: EnumEntry](name: String, enum: Enum[Entry], default:
<.span(name),
<.select(
^.key := id.name,
^.value := state.get(id).map(_.asInstanceOf[Entry]) // scalafix:ok
^.value := state
.get(id)
.map(_.asInstanceOf[Entry]) // scalafix:ok
.orElse(default)
.map(_.entryName)
.getOrElse(disabled),
Expand Down
Loading

0 comments on commit 8c7f985

Please sign in to comment.