Skip to content

Commit

Permalink
Upgrade Kubernetes client and use more cats effect
Browse files Browse the repository at this point in the history
  • Loading branch information
joan38 committed Jan 20, 2019
1 parent d35a0b0 commit bcd96e0
Show file tree
Hide file tree
Showing 52 changed files with 1,455 additions and 1,309 deletions.
11 changes: 4 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ ThisBuild / scalacOptions ++= Seq(
"-Xlint:unsound-match",
"-Ywarn-inaccessible",
"-Ywarn-infer-any",
"-Ywarn-unused:imports",
"-Ywarn-unused:locals",
"-Ywarn-unused:patvars",
"-Ywarn-unused:privates",
"-Ywarn-unused",
"-Ypartial-unification",
"-Ywarn-dead-code"
)
Expand Down Expand Up @@ -58,8 +55,7 @@ lazy val `orkestra-core` = crossProject(JVMPlatform, JSPlatform)
"com.chuusai" %%% "shapeless" % "2.3.3",
"com.vmunier" %% "scalajs-scripts" % "1.1.2",
"com.lihaoyi" %%% "autowire" % "0.2.6",
"com.goyeau" %% "kubernetes-client" % "0.0.5",
"org.typelevel" %% "cats-effect" % "1.0.0",
"com.goyeau" %% "kubernetes-client" % "0.3.0",
"org.scala-lang" % "scala-reflect" % scalaVersion.value
) ++
scalaJsReact.value ++
Expand Down Expand Up @@ -140,6 +136,7 @@ lazy val `orkestra-integration-tests` = crossProject(JVMPlatform, JSPlatform)
version ~= (_.replace('+', '-')),
buildInfoPackage := s"${organization.value}.integration.tests",
buildInfoKeys += "artifactName" -> artifact.value.name,
scalaJSUseMainModuleInitializer := true,
libraryDependencies ++= scalaTest.value,
publishArtifact := false,
publishLocal := {}
Expand Down Expand Up @@ -224,7 +221,7 @@ lazy val react = Def.setting {
}

lazy val elastic4s = Def.setting {
val elastic4sVersion = "6.4.0"
val elastic4sVersion = "6.5.0"
Seq(
"com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion,
"com.sksamuel.elastic4s" %% "elastic4s-http-streams" % elastic4sVersion,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package tech.orkestra

import cats.effect.{ExitCode, IO, IOApp}
import tech.orkestra.board.Board
import tech.orkestra.css.AppCss
import tech.orkestra.route.WebRouter
import com.goyeau.kubernetes.client.KubernetesClient
import com.sksamuel.elastic4s.http.ElasticClient
import org.scalajs.dom

/**
* Mix in this trait to create the Orkestra server.
*/
trait OrkestraServer[F[_]] extends OrkestraPlugin {
trait OrkestraServer extends IOApp with OrkestraPlugin[IO] {
implicit override def orkestraConfig: OrkestraConfig = ???
implicit override def kubernetesClient: KubernetesClient = ???
implicit override def elasticsearchClient: ElasticClient = ???

def board: Board

def main(args: Array[String]): Unit = {
def run(args: List[String]): IO[ExitCode] = IO {
AppCss.load()
WebRouter.router(board).renderIntoDOM(dom.document.getElementById(BuildInfo.projectName.toLowerCase))
ExitCode.Success
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package tech.orkestra

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives.{entity, _}
import autowire.Core
import cats.Applicative
import cats.effect.{ExitCode, IO, IOApp}
import com.goyeau.kubernetes.client.KubernetesClient
import com.sksamuel.elastic4s.http.ElasticClient
import com.typesafe.scalalogging.Logger
Expand All @@ -14,24 +14,24 @@ import io.circe.Json
import io.circe.generic.auto._
import io.circe.shapes._
import io.circe.java8.time._
import scalajs.html.scripts
import tech.orkestra.utils.AkkaImplicits._
import tech.orkestra.job.Job
import tech.orkestra.kubernetes.Kubernetes
import tech.orkestra.utils.{AutowireServer, Elasticsearch}
import scalajs.html.scripts

/**
* Mix in this trait to create the Orkestra job server.
*/
trait OrkestraServer[F[_]] extends OrkestraPlugin {
trait OrkestraServer extends IOApp with OrkestraPlugin[IO] {
private lazy val logger = Logger(getClass)
override lazy val F: Applicative[IO] = implicitly[Applicative[IO]]
implicit override lazy val orkestraConfig: OrkestraConfig = OrkestraConfig.fromEnvVars()
implicit override lazy val kubernetesClient: KubernetesClient = Kubernetes.client
implicit override lazy val elasticsearchClient: ElasticClient = Elasticsearch.client

def jobs: Set[Job[F, _, _]]
def jobs: Set[Job[IO, _, _]]

lazy val routes =
def routes(implicit kubernetesClient: KubernetesClient[IO]) =
pathPrefix("assets" / Remaining) { file =>
encodeResponse {
getFromResource(s"public/$file")
Expand Down Expand Up @@ -69,32 +69,30 @@ trait OrkestraServer[F[_]] extends OrkestraPlugin {
path(OrkestraConfig.commonSegment / Segments) { segments =>
entity(as[Json]) { json =>
val body = AutowireServer.read[Map[String, Json]](json)
val request = AutowireServer.route[CommonApi](CommonApiServer())(Core.Request(segments, body))
val request = AutowireServer.route[CommonApi](CommonApiServer[IO]())(Core.Request(segments, body))
onSuccess(request)(json => complete(json))
}
}
}

def main(args: Array[String]): Unit =
Await.result(
orkestraConfig.runInfoMaybe.fold {
for {
_ <- Future(logger.info("Initializing Elasticsearch"))
_ <- Elasticsearch.init()
_ = logger.info("Starting master Orkestra")
_ <- onMasterStart()
_ <- Http().bindAndHandle(routes, "0.0.0.0", orkestraConfig.port)
} yield ()
} { runInfo =>
for {
_ <- Future(logger.info(s"Running job $runInfo"))
_ <- onJobStart(runInfo)
_ <- jobs
.find(_.board.id == runInfo.jobId)
.getOrElse(throw new IllegalArgumentException(s"No job found for id ${runInfo.jobId}"))
.start(runInfo)
} yield ()
},
Duration.Inf
)
def run(args: List[String]): IO[ExitCode] = Kubernetes.client[IO].use { implicit kubernetesClient =>
orkestraConfig.runInfoMaybe.fold {
for {
_ <- IO.pure(logger.info("Initializing Elasticsearch"))
_ <- Elasticsearch.init[IO]
_ = logger.info("Starting master Orkestra")
_ <- onMasterStart(kubernetesClient)
_ <- IO.fromFuture(IO(Http().bindAndHandle(routes, "0.0.0.0", orkestraConfig.port)))
} yield ExitCode.Success
} { runInfo =>
for {
_ <- IO.delay(logger.info(s"Running job $runInfo"))
_ <- onJobStart(runInfo)
_ <- jobs
.find(_.board.id == runInfo.jobId)
.getOrElse(throw new IllegalArgumentException(s"No job found for id ${runInfo.jobId}"))
.start(runInfo)
} yield ExitCode.Success
}
}
}
59 changes: 35 additions & 24 deletions orkestra-core/src/main/scala/tech/orkestra/CommonApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ package tech.orkestra
import java.io.IOException
import java.time.Instant

import cats.Applicative
import cats.effect.ConcurrentEffect
import cats.implicits._

import scala.concurrent.Future
import com.goyeau.kubernetes.client.KubernetesClient
import com.sksamuel.elastic4s.cats.effect.instances._
import com.sksamuel.elastic4s.circe._
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.http.ElasticClient
Expand All @@ -26,12 +31,12 @@ object CommonApi {
val client = AutowireClient(OrkestraConfig.commonSegment)[CommonApi]
}

case class CommonApiServer()(
implicit orkestraConfig: OrkestraConfig,
kubernetesClient: KubernetesClient,
case class CommonApiServer[F[_]: ConcurrentEffect]()(
implicit
orkestraConfig: OrkestraConfig,
kubernetesClient: KubernetesClient[F],
elasticsearchClient: ElasticClient
) extends CommonApi {
import tech.orkestra.utils.AkkaImplicits._

override def logs(runId: RunId, page: Page[(Instant, Int)]): Future[Seq[LogLine]] =
elasticsearchClient
Expand All @@ -55,28 +60,34 @@ case class CommonApiServer()(
.size(math.abs(page.size))
)
.map(response => response.fold(throw new IOException(response.error.reason))(_.to[LogLine]))
.unsafeToFuture()

override def runningJobs(): Future[Seq[Run[HNil, Unit]]] =
for {
runInfos <- kubernetesClient.jobs
.namespace(orkestraConfig.namespace)
.list()
.map(_.items.map(RunInfo.fromKubeJob))
ConcurrentEffect[F]
.toIO {
for {
runInfo <- kubernetesClient.jobs
.namespace(orkestraConfig.namespace)
.list
.map(_.items.map(RunInfo.fromKubeJob))

runs <- if (runInfos.nonEmpty)
elasticsearchClient
.execute(
search(HistoryIndex.index)
.query(
boolQuery.filter(
termsQuery("runInfo.runId", runInfos.map(_.runId.value)),
termsQuery("runInfo.jobId", runInfos.map(_.jobId.value))
)
runs <- if (runInfo.nonEmpty)
elasticsearchClient
.execute(
search(HistoryIndex.index)
.query(
boolQuery.filter(
termsQuery("runInfo.runId", runInfo.map(_.runId.value)),
termsQuery("runInfo.jobId", runInfo.map(_.jobId.value))
)
)
.sortBy(fieldSort("triggeredOn").desc(), fieldSort("_id").desc())
.size(1000)
)
.sortBy(fieldSort("triggeredOn").desc(), fieldSort("_id").desc())
.size(1000)
)
.map(response => response.fold(throw new IOException(response.error.reason))(_.to[Run[HNil, Unit]]))
else Future.successful(Seq.empty)
} yield runs
.map(response => response.fold(throw new IOException(response.error.reason))(_.to[Run[HNil, Unit]]))
.to[F]
else Applicative[F].pure(IndexedSeq.empty[Run[HNil, Unit]])
} yield runs
}
.unsafeToFuture()
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import scala.io.Source
import com.sksamuel.elastic4s.http.ElasticProperties
import io.circe.generic.auto._
import io.circe.parser._
import org.http4s.Uri
import tech.orkestra.model.{EnvRunInfo, RunId, RunInfo}

case class OrkestraConfig(
elasticsearchProperties: ElasticProperties,
workspace: String = OrkestraConfig.defaultWorkspace,
port: Int = OrkestraConfig.defaultBindPort,
runInfoMaybe: Option[RunInfo] = None,
kubeUri: String,
kubeUri: Uri,
namespace: String,
podName: String,
basePath: String = OrkestraConfig.defaultBasePath
Expand All @@ -34,7 +35,8 @@ object OrkestraConfig {
fromEnvVar("RUN_INFO").map { runInfoJson =>
decode[EnvRunInfo](runInfoJson).fold(throw _, runInfo => RunInfo(runInfo.jobId, runInfo.runId.getOrElse(jobUid)))
},
fromEnvVar("KUBE_URI").getOrElse(throw new IllegalStateException("ORKESTRA_KUBE_URI should be set")),
fromEnvVar("KUBE_URI")
.fold(throw new IllegalStateException("ORKESTRA_KUBE_URI should be set"))(Uri.unsafeFromString),
fromEnvVar("NAMESPACE").getOrElse(throw new IllegalStateException("ORKESTRA_NAMESPACE should be set")),
fromEnvVar("POD_NAME").getOrElse(throw new IllegalStateException("ORKESTRA_POD_NAME should be set")),
fromEnvVar("BASEPATH").getOrElse(defaultBasePath)
Expand Down
10 changes: 5 additions & 5 deletions orkestra-core/src/main/scala/tech/orkestra/OrkestraPlugin.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package tech.orkestra

import scala.concurrent.Future
import cats.Applicative
import com.goyeau.kubernetes.client.KubernetesClient
import com.sksamuel.elastic4s.http.ElasticClient
import tech.orkestra.model.RunInfo

trait OrkestraPlugin {
trait OrkestraPlugin[F[_]] {
implicit protected def F: Applicative[F]
implicit protected def orkestraConfig: OrkestraConfig
implicit protected def kubernetesClient: KubernetesClient
implicit protected def elasticsearchClient: ElasticClient

def onMasterStart(): Future[Unit] = Future.unit
def onJobStart(runInfo: RunInfo): Future[Unit] = Future.unit
def onMasterStart(kubernetesClient: KubernetesClient[F]): F[Unit] = Applicative[F].unit
def onJobStart(runInfo: RunInfo): F[Unit] = Applicative[F].unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ trait JobBoard[Parameters <: HList] extends Board {
}

private[orkestra] object Api {
def router(apiServer: Api)(
implicit ec: ExecutionContext,
encoderP: Encoder[Parameters],
decoderP: Decoder[Parameters]
) =
def router(apiServer: Api)(implicit ec: ExecutionContext, decoder: Decoder[Parameters]) =
AutowireServer.route[Api](apiServer)

val client = AutowireClient(s"${OrkestraConfig.jobSegment}/${id.value}")[Api]
Expand Down
8 changes: 6 additions & 2 deletions orkestra-core/src/main/scala/tech/orkestra/input/Input.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ case class Text[T: Encoder: Decoder](name: String, default: Option[T] = None) ex
<.span(name),
<.input.text(
^.key := id.name,
^.value := state.get(id).map(_.asInstanceOf[T]) // scalafix:ok
^.value := state
.get(id)
.map(_.asInstanceOf[T]) // scalafix:ok
.orElse(default)
.fold("")(implicitly[Encoder[T]].apply(_)),
^.onChange ==> modValue
Expand Down Expand Up @@ -88,7 +90,9 @@ case class Select[Entry <: EnumEntry](name: String, enum: Enum[Entry], default:
<.span(name),
<.select(
^.key := id.name,
^.value := state.get(id).map(_.asInstanceOf[Entry]) // scalafix:ok
^.value := state
.get(id)
.map(_.asInstanceOf[Entry]) // scalafix:ok
.orElse(default)
.map(_.entryName)
.getOrElse(disabled),
Expand Down
Loading

0 comments on commit bcd96e0

Please sign in to comment.