From 6628c8fe98aa5c1cd9bc1de4998d2d0d3d1a504d Mon Sep 17 00:00:00 2001 From: Daisuke Taniwaki Date: Mon, 3 Sep 2018 21:56:47 +0900 Subject: [PATCH 01/21] Add delete collection methods (#210) * Add delete collection methods * Rename deleteCollection methods to deleteAll --- client/src/it/scala/skuber/PodSpec.scala | 75 ++++++++++++------- .../src/main/scala/skuber/api/package.scala | 26 +++++++ 2 files changed, 72 insertions(+), 29 deletions(-) diff --git a/client/src/it/scala/skuber/PodSpec.scala b/client/src/it/scala/skuber/PodSpec.scala index 314c9714..19e02c1c 100644 --- a/client/src/it/scala/skuber/PodSpec.scala +++ b/client/src/it/scala/skuber/PodSpec.scala @@ -1,21 +1,24 @@ package skuber -import org.scalatest.Matchers +import org.scalatest.{BeforeAndAfterAll, Matchers} import org.scalatest.concurrent.Eventually import skuber.json.format._ import scala.concurrent.duration._ import scala.concurrent.Await -import scala.util.{Failure, Success, Try} +import scala.util.{Failure, Success} -import akka.event.Logging -import akka.stream.scaladsl._ -import akka.util.ByteString -import scala.concurrent.Future - -class PodSpec extends K8SFixture with Eventually with Matchers { +class PodSpec extends K8SFixture with Eventually with Matchers with BeforeAndAfterAll { val nginxPodName: String = java.util.UUID.randomUUID().toString + val defaultLabels = Map("app" -> this.suiteName) + + override def afterAll() = { + val k8s = k8sInit + val requirements = defaultLabels.toSeq.map { case (k, v) => LabelSelector.IsEqualRequirement(k, v) } + val labelSelector = LabelSelector(requirements: _*) + Await.result(k8s.deleteAllSelected[PodList](labelSelector), 5 seconds) + } behavior of "Pod" @@ -33,24 +36,24 @@ class PodSpec extends K8SFixture with Eventually with Matchers { it should "check for newly created pod and container to be ready" in { k8s => eventually(timeout(100 seconds), interval(3 seconds)) { - val retrievePod=k8s.get[Pod](nginxPodName) - val podRetrieved=Await.ready(retrievePod, 2 seconds).value.get - val podStatus=podRetrieved.get.status.get + val retrievePod = k8s.get[Pod](nginxPodName) + val podRetrieved = Await.ready(retrievePod, 2 seconds).value.get + val podStatus = podRetrieved.get.status.get val nginxContainerStatus = podStatus.containerStatuses(0) podStatus.phase should contain(Pod.Phase.Running) - nginxContainerStatus.name should be("nginx") + nginxContainerStatus.name should be(nginxPodName) nginxContainerStatus.state.get shouldBe a[Container.Running] - val isUnschedulable=podStatus.conditions.exists { c => - c._type=="PodScheduled" && c.status=="False" && c.reason==Some("Unschedulable") + val isUnschedulable = podStatus.conditions.exists { c => + c._type == "PodScheduled" && c.status == "False" && c.reason == Some("Unschedulable") } - val isScheduled=podStatus.conditions.exists { c => - c._type=="PodScheduled" && c.status=="True" + val isScheduled = podStatus.conditions.exists { c => + c._type == "PodScheduled" && c.status == "True" } - val isInitialised=podStatus.conditions.exists { c => - c._type=="Initialized" && c.status=="True" + val isInitialised = podStatus.conditions.exists { c => + c._type == "Initialized" && c.status == "True" } - val isReady=podStatus.conditions.exists { c => - c._type=="Ready" && c.status=="True" + val isReady = podStatus.conditions.exists { c => + c._type == "Ready" && c.status == "True" } assert(isScheduled) assert(isInitialised) @@ -62,23 +65,37 @@ class PodSpec extends K8SFixture with Eventually with Matchers { k8s.delete[Pod](nginxPodName).map { _ => eventually(timeout(100 seconds), interval(3 seconds)) { val retrievePod = k8s.get[Pod](nginxPodName) - val podRetrieved=Await.ready(retrievePod, 2 seconds).value.get + val podRetrieved = Await.ready(retrievePod, 2 seconds).value.get podRetrieved match { case s: Success[_] => assert(false) case Failure(ex) => ex match { - case ex: K8SException if ex.status.code.contains(404) => assert(true) - case _ => assert(false) - } + case ex: K8SException if ex.status.code.contains(404) => assert(true) + case _ => assert(false) + } } } } } - def getNginxContainer(version: String): Container = Container(name = "nginx", image = "nginx:" + version).exposePort(80) + it should "delete selected pods" in { k8s => + for { + _ <- k8s.create(getNginxPod(nginxPodName + "-foo", "1.7.9", labels = Map("foo" -> "1"))) + _ <- k8s.create(getNginxPod(nginxPodName + "-bar", "1.7.9", labels = Map("bar" -> "2"))) + _ <- k8s.deleteAllSelected[PodList](LabelSelector(LabelSelector.ExistsRequirement("foo"))) + } yield eventually(timeout(100 seconds), interval(3 seconds)) { + val retrievePods = k8s.list[PodList]() + val podsRetrieved = Await.result(retrievePods, 2 seconds) + val podNamesRetrieved = podsRetrieved.items.map(_.name) + assert(!podNamesRetrieved.contains(nginxPodName + "-foo") && podNamesRetrieved.contains(nginxPodName + "-bar")) + } + } + + def getNginxContainer(name: String, version: String): Container = Container(name = name, image = "nginx:" + version).exposePort(80) - def getNginxPod(name: String, version: String): Pod = { - val nginxContainer = getNginxContainer(version) - val nginxPodSpec = Pod.Spec(containers=List((nginxContainer))) - Pod.named(nginxPodName).copy(spec=Some(nginxPodSpec)) + def getNginxPod(name: String, version: String, labels: Map[String, String] = Map()): Pod = { + val nginxContainer = getNginxContainer(name, version) + val nginxPodSpec = Pod.Spec(containers = List((nginxContainer))) + val podMeta=ObjectMeta(name = name, labels = labels ++ defaultLabels) + Pod(metadata = podMeta, spec = Some(nginxPodSpec)) } } diff --git a/client/src/main/scala/skuber/api/package.scala b/client/src/main/scala/skuber/api/package.scala index da7e88c3..f9306335 100644 --- a/client/src/main/scala/skuber/api/package.scala +++ b/client/src/main/scala/skuber/api/package.scala @@ -556,6 +556,32 @@ package object client { } yield () } + def deleteAll[L <: ListResource[_]]()( + implicit fmt: Format[L], rd: ResourceDefinition[L], lc: LoggingContext=RequestLoggingContext()): Future[L] = + { + _deleteAll[L](rd, None) + } + + def deleteAllSelected[L <: ListResource[_]](labelSelector: LabelSelector)( + implicit fmt: Format[L], rd: ResourceDefinition[L], lc: LoggingContext=RequestLoggingContext()): Future[L] = + { + _deleteAll[L](rd, Some(labelSelector)) + } + + private def _deleteAll[L <: ListResource[_]](rd: ResourceDefinition[_], maybeLabelSelector: Option[LabelSelector])( + implicit fmt: Format[L], lc: LoggingContext=RequestLoggingContext()): Future[L] = + { + val queryOpt = maybeLabelSelector map { ls => + Uri.Query("labelSelector" -> ls.toString) + } + if (log.isDebugEnabled) { + val lsInfo = maybeLabelSelector map { ls => s" with label selector '${ls.toString}'" } getOrElse "" + logDebug(s"[Delete request: resources of kind '${rd.spec.names.kind}'${lsInfo}") + } + val req = buildRequest(HttpMethods.DELETE, rd, None, query = queryOpt) + makeRequestReturningListResource[L](req) + } + def getPodLogSource(name: String, queryParams: Pod.LogQueryParams, namespace: Option[String] = None)( implicit lc: LoggingContext=RequestLoggingContext()): Future[Source[ByteString, _]] = { From 0f713e089ccaf8676de63b1b5f1b632b97fef91e Mon Sep 17 00:00:00 2001 From: Chris Baker Date: Mon, 3 Sep 2018 09:02:50 -0400 Subject: [PATCH 02/21] added "Delete" to skuber.PersistentVolume.RelaimPolicy, per #207 (#208) --- client/src/main/scala/skuber/PersistentVolume.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/scala/skuber/PersistentVolume.scala b/client/src/main/scala/skuber/PersistentVolume.scala index 9d8402b6..e2af3162 100644 --- a/client/src/main/scala/skuber/PersistentVolume.scala +++ b/client/src/main/scala/skuber/PersistentVolume.scala @@ -45,7 +45,7 @@ object PersistentVolume { object ReclaimPolicy extends Enumeration { type ReclaimPolicy = Value - val Recycle, Retain = Value + val Recycle, Retain, Delete = Value } case class Spec( From 91a9d020bfd5fbdde0b270a5ed8690a5652316b1 Mon Sep 17 00:00:00 2001 From: Ihor Antonov Date: Wed, 5 Sep 2018 00:17:18 -0400 Subject: [PATCH 03/21] Object namespace has higher priority when creating (#211) --- .../src/it/scala/skuber/NamespaceSpec.scala | 97 +++++++++++++------ client/src/main/scala/skuber/ConfigMap.scala | 2 + client/src/main/scala/skuber/Namespace.scala | 4 +- .../src/main/scala/skuber/api/package.scala | 29 +++--- docs/Examples.md | 30 ++++++ 5 files changed, 117 insertions(+), 45 deletions(-) diff --git a/client/src/it/scala/skuber/NamespaceSpec.scala b/client/src/it/scala/skuber/NamespaceSpec.scala index 40e5ccbc..c80b616b 100644 --- a/client/src/it/scala/skuber/NamespaceSpec.scala +++ b/client/src/it/scala/skuber/NamespaceSpec.scala @@ -1,9 +1,11 @@ package skuber +import java.util.UUID.randomUUID + import org.scalatest.Matchers import org.scalatest.concurrent.Eventually +import json.format.{namespaceFormat, podFormat} -import json.format.{namespaceFormat,podFormat} import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import scala.util.{Failure, Success} @@ -13,27 +15,49 @@ import scala.util.{Failure, Success} */ class NamespaceSpec extends K8SFixture with Eventually with Matchers { - val nginxPodName: String = java.util.UUID.randomUUID().toString + val nginxPodName1: String = randomUUID().toString + val nginxPodName2: String = randomUUID().toString + + val namespace1Name: String = "namespace1" + val namespace2Name: String = "namespace2" + val testNamespaces = List(namespace1Name,namespace2Name) - val namespace1Name="namespace1" + val pod1: Pod = getNginxPod(namespace1Name,nginxPodName1) + val pod2: Pod = getNginxPod(namespace2Name, nginxPodName2) behavior of "Namespace" - it should "create a namespace" in { k8s => - k8s.create(Namespace.forName(namespace1Name)).map { ns => - assert(ns.name == namespace1Name) + it should "create namespace1" in { k8s => + k8s.create(Namespace(namespace1Name)).map { ns => assert(ns.name == namespace1Name) } + } + + it should "create namespace2" in { k8s => + k8s.create(Namespace(namespace2Name)).map { ns => assert(ns.name == namespace2Name)} + } + + it should "create pod1 in namespace1" in { k8s => + k8s.usingNamespace(namespace1Name).create(pod1) + .map { p => + assert(p.name == nginxPodName1) + assert(p.namespace == namespace1Name) } } - it should "create a pod in the newly created namespace" in { k8s => - k8s.usingNamespace(namespace1Name).create(getNginxPod(namespace1Name, nginxPodName, "1.7.9")) map { p => - assert(p.name == nginxPodName) - assert(p.namespace == namespace1Name) + it should "honor namespace precedence hierarchy: object > client" in { k8s => + k8s.usingNamespace(namespace1Name).create(pod2).map { p => + assert(p.name == nginxPodName2) + assert(p.namespace == namespace2Name) } } - it should "not find the newly created pod in the default namespace" in { k8s => - val retrievePod = k8s.get[Pod](nginxPodName) + it should "find the pod2 in namespace2" in { k8s => + k8s.usingNamespace(namespace2Name).get[Pod](nginxPodName2) map { p => + assert(p.name == nginxPodName2) + } + } + + it should "not find pod1 in the default namespace" in { k8s => + val retrievePod = k8s.get[Pod](nginxPodName1) val podRetrieved=Await.ready(retrievePod, 2 seconds).value.get podRetrieved match { case s: Success[_] => assert(false) @@ -44,34 +68,43 @@ class NamespaceSpec extends K8SFixture with Eventually with Matchers { } } - - it should "find the newly created pod in the newly created namespace" in { k8s => - k8s.usingNamespace(namespace1Name).get[Pod](nginxPodName) map { p => - assert(p.name == nginxPodName) + it should "find the pod1 in namespace1" in { k8s => + k8s.usingNamespace(namespace1Name).get[Pod](nginxPodName1) map { p => + assert(p.name == nginxPodName1) } } - it should "delete the namespace" in { k8s => - val deleteNs = k8s.delete[Namespace](namespace1Name) - eventually(timeout(100 seconds), interval(3 seconds)) { - val retrieveNs = k8s.get[Namespace](namespace1Name) - val nsRetrieved = Await.ready(retrieveNs, 2 seconds).value.get - nsRetrieved match { - case s: Success[_] => assert(false) - case Failure(ex) => ex match { - case ex: K8SException if ex.status.code.contains(404) => assert(true) - case _ => assert(false) - } - } + it should "delete all test namespaces" in { k8s => + val t = timeout(100.seconds) + val i = interval(3.seconds) + // Delete namespaces + testNamespaces.foreach { ns => k8s.delete[Namespace](ns) } + + eventually(t, i) { + testNamespaces.map { ns => k8s.get[Namespace](ns) } + + assert(!testNamespaces + .map { n => k8s.get[Namespace](n) } // get every namespace + .map { f => Await.ready(f, 2.seconds).value.get } // await completion of each get + .map { // find out if deletion was successful + case s: Success[_] => false + case Failure(ex) => ex match { + case ex: K8SException if ex.status.code.contains(404) + => true + case _ => false + } + }.reduceLeft(_ && _)) // consider success only if all namespaces were deleted } } - def getNginxContainer(version: String): Container = Container(name = "nginx", image = "nginx:" + version).exposePort(80) + def getNginxContainer(version: String): Container = + Container(name = "nginx", image = "nginx:" + version) + .exposePort(port = 80) - def getNginxPod(namespace: String, name: String, version: String): Pod = { + def getNginxPod(namespace: String, name: String, version: String = "1.7.8"): Pod = { val nginxContainer = getNginxContainer(version) - val nginxPodSpec = Pod.Spec(containers=List((nginxContainer))) - val podMeta=ObjectMeta(namespace=namespace, name = name) + val nginxPodSpec = Pod.Spec(containers=List(nginxContainer)) + val podMeta = ObjectMeta(namespace=namespace, name = name) Pod(metadata=podMeta, spec=Some(nginxPodSpec)) } } diff --git a/client/src/main/scala/skuber/ConfigMap.scala b/client/src/main/scala/skuber/ConfigMap.scala index b6692840..80262ffc 100644 --- a/client/src/main/scala/skuber/ConfigMap.scala +++ b/client/src/main/scala/skuber/ConfigMap.scala @@ -3,6 +3,8 @@ package skuber /** * @author Cory Klein */ + + case class ConfigMap(val kind: String ="ConfigMap", override val apiVersion: String = v1, val metadata: ObjectMeta, diff --git a/client/src/main/scala/skuber/Namespace.scala b/client/src/main/scala/skuber/Namespace.scala index 5c56b055..b75e5fdb 100644 --- a/client/src/main/scala/skuber/Namespace.scala +++ b/client/src/main/scala/skuber/Namespace.scala @@ -48,6 +48,6 @@ object Namespace { lazy val all = Namespace.forName("") lazy val none = all def forName(label: String) : Namespace = Namespace(metadata=ObjectMeta(name=label)) - def from(meta:ObjectMeta) : Namespace = Namespace(metadata=meta) - def apply(label: String) : Namespace = Namespace(metadata=ObjectMeta(name=label)) + def from(meta:ObjectMeta) : Namespace = Namespace(metadata=meta) + def apply(label: String) : Namespace = Namespace(metadata=ObjectMeta(name=label)) } \ No newline at end of file diff --git a/client/src/main/scala/skuber/api/package.scala b/client/src/main/scala/skuber/api/package.scala index f9306335..a09d35ec 100644 --- a/client/src/main/scala/skuber/api/package.scala +++ b/client/src/main/scala/skuber/api/package.scala @@ -376,7 +376,7 @@ package object client { // if this is a POST we don't include the resource name in the URL val nameComponent: Option[String] = method match { case HttpMethods.POST => None - case _ => Some(obj.name) + case _ => Some(obj.name) } modify(method, obj, nameComponent) } @@ -384,11 +384,16 @@ package object client { private[skuber] def modify[O <: ObjectResource](method: HttpMethod, obj: O, nameComponent: Option[String])( implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Future[O] = { + // Namespace set in the object metadata (if set) has higher priority than that of the + // request context (see Issue #204) + val targetNamespace = if (obj.metadata.namespace.isEmpty) namespaceName else obj.metadata.namespace + logRequestObjectDetails(method, obj) val marshal = Marshal(obj) for { - requestEntity <- marshal.to[RequestEntity] - httpRequest = buildRequest(method, rd, nameComponent).withEntity(requestEntity.withContentType(MediaTypes.`application/json`)) + requestEntity <- marshal.to[RequestEntity] + httpRequest = buildRequest(method, rd, nameComponent, namespace = targetNamespace) + .withEntity(requestEntity.withContentType(MediaTypes.`application/json`)) newOrUpdatedResource <- makeRequestReturningObjectResource[O](httpRequest) } yield newOrUpdatedResource } @@ -549,10 +554,11 @@ package object client { val marshalledOptions = Marshal(options) for { requestEntity <- marshalledOptions.to[RequestEntity] - request = buildRequest(HttpMethods.DELETE, rd, Some(name)).withEntity(requestEntity.withContentType(MediaTypes.`application/json`)) - response <- invoke(request) - _ <- checkResponseStatus(response) - _ <- ignoreResponseBody(response) + request = buildRequest(HttpMethods.DELETE, rd, Some(name)) + .withEntity(requestEntity.withContentType(MediaTypes.`application/json`)) + response <- invoke(request) + _ <- checkResponseStatus(response) + _ <- ignoreResponseBody(response) } yield () } @@ -594,7 +600,7 @@ package object client { } val nameComponent=s"${name}/log" val rd = implicitly[ResourceDefinition[Pod]] - val request=buildRequest(HttpMethods.GET, rd, Some(nameComponent), query, false, targetNamespace) + val request = buildRequest(HttpMethods.GET, rd, Some(nameComponent), query, false, targetNamespace) invoke(request).map { response => response.entity.dataBytes } @@ -684,11 +690,12 @@ package object client { def updateScale[O <: ObjectResource](objName: String, scale: Scale)( implicit rd: ResourceDefinition[O], sc: Scale.SubresourceSpec[O], lc:LoggingContext=RequestLoggingContext()): Future[Scale] = { - implicit val dispatcher=actorSystem.dispatcher + implicit val dispatcher = actorSystem.dispatcher val marshal = Marshal(scale) for { - requestEntity <- marshal.to[RequestEntity] - httpRequest = buildRequest(HttpMethods.PUT, rd, Some(s"${objName}/scale")).withEntity(requestEntity.withContentType(MediaTypes.`application/json`)) + requestEntity <- marshal.to[RequestEntity] + httpRequest = buildRequest(HttpMethods.PUT, rd, Some(s"${objName}/scale")) + .withEntity(requestEntity.withContentType(MediaTypes.`application/json`)) scaledResource <- makeRequestReturningObjectResource[Scale](httpRequest) } yield scaledResource } diff --git a/docs/Examples.md b/docs/Examples.md index cdcbc3a8..25804e51 100644 --- a/docs/Examples.md +++ b/docs/Examples.md @@ -1,5 +1,10 @@ # Skuber usage examples +Skuber is built on top of Akka HTTP and therefore it is non-blocking and concurrent by default. +Almost all requests return a Future, and you need to write a little bit of extra code if you want quick +experiments in a single-threaded environment (like Ammonite REPL, or simple tests) +It all boils down to either using Await or onComplete - see examples below. + ## Basic imports ```scala @@ -43,6 +48,31 @@ listPodsRequest.onComplete { } ``` +## List Namespaces + +```scala +import scala.concurrent.Await +import scala.concurrent.duration._ + +val list = Await.result(k8s.list[NamespaceList], 10.seconds).items.map(i => i.name) +// res19: List[String] = List("default", "kube-public", "kube-system", "namespace2", "ns-1") + +``` + + +## Create Pod + +```scala +import scala.concurrent.Await +import scala.concurrent.duration._ + +val podSpec = Pod.Spec(List(Container(name = "nginx", image = "nginx"))) +val pod = Pod("nginxpod", podSpec) +val podFuture = k8s.create(pod) +// handle future as you see fit +``` + + ## Create deployment This example creates a nginx service (accessed via port 30001 on each Kubernetes cluster node) that is backed by a deployment of five nginx replicas. From 51dc170e1d49d950511f641ae749b67b2d43dd73 Mon Sep 17 00:00:00 2001 From: Daisuke Taniwaki Date: Thu, 6 Sep 2018 02:30:55 +0900 Subject: [PATCH 04/21] Support pod exec (#195) * Implement pod exec * Add pod exec tests * Add pod exec examples * Set status code of websocket upgrade response * Assert by status codes instead of messages --- client/src/it/scala/skuber/ExecSpec.scala | 116 ++++++++++++++++++ .../src/main/scala/skuber/api/package.scala | 106 +++++++++++++++- .../skuber/api/security/HTTPRequestAuth.scala | 16 ++- .../skuber/examples/exec/ExecExamples.scala | 87 +++++++++++++ 4 files changed, 317 insertions(+), 8 deletions(-) create mode 100644 client/src/it/scala/skuber/ExecSpec.scala create mode 100644 examples/src/main/scala/skuber/examples/exec/ExecExamples.scala diff --git a/client/src/it/scala/skuber/ExecSpec.scala b/client/src/it/scala/skuber/ExecSpec.scala new file mode 100644 index 00000000..d362893c --- /dev/null +++ b/client/src/it/scala/skuber/ExecSpec.scala @@ -0,0 +1,116 @@ +package skuber + +import akka.Done +import akka.stream.scaladsl.{Sink, Source} +import org.scalatest.{BeforeAndAfterAll, Matchers} +import org.scalatest.concurrent.Eventually +import skuber.json.format._ + +import scala.concurrent.duration._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future, Promise} + + +class ExecSpec extends K8SFixture with Eventually with Matchers with BeforeAndAfterAll { + val nginxPodName: String = java.util.UUID.randomUUID().toString + + behavior of "Exec" + + override def beforeAll(): Unit = { + super.beforeAll() + + val k8s = k8sInit + Await.result(k8s.create(getNginxPod(nginxPodName, "1.7.9")), 3 second) + // Let the pod running + Thread.sleep(3000) + k8s.close + } + + override def afterAll(): Unit = { + val k8s = k8sInit + Await.result(k8s.delete[Pod](nginxPodName), 3 second) + Thread.sleep(3000) + k8s.close + + super.afterAll() + } + + it should "execute a command in the running pod" in { k8s => + var output = "" + val stdout: Sink[String, Future[Done]] = Sink.foreach(output += _) + var errorOutput = "" + val stderr: Sink[String, Future[Done]] = Sink.foreach(errorOutput += _) + k8s.exec(nginxPodName, Seq("whoami"), maybeStdout = Some(stdout), maybeStderr = Some(stderr), maybeClose = Some(closeAfter(1 second))).map { _ => + assert(output == "root\n") + assert(errorOutput == "") + } + } + + it should "execute a command in the specified container of the running pod" in { k8s => + var output = "" + val stdout: Sink[String, Future[Done]] = Sink.foreach(output += _) + var errorOutput = "" + val stderr: Sink[String, Future[Done]] = Sink.foreach(errorOutput += _) + k8s.exec(nginxPodName, Seq("whoami"), maybeContainerName = Some("nginx"), + maybeStdout = Some(stdout), maybeStderr = Some(stderr), maybeClose = Some(closeAfter(1 second))).map { _ => + assert(output == "root\n") + assert(errorOutput == "") + } + } + + it should "execute a command that outputs to stderr in the running pod" in { k8s => + var output = "" + val stdout: Sink[String, Future[Done]] = Sink.foreach(output += _) + var errorOutput = "" + val stderr: Sink[String, Future[Done]] = Sink.foreach(errorOutput += _) + k8s.exec(nginxPodName, Seq("sh", "-c", "whoami >&2"), + maybeStdout = Some(stdout), maybeStderr = Some(stderr), maybeClose = Some(closeAfter(1 second))).map { _ => + assert(output == "") + assert(errorOutput == "root\n") + } + } + + it should "execute a command in an interactive shell of the running pod" in { k8s => + val stdin = Source.single("whoami\n") + var output = "" + val stdout: Sink[String, Future[Done]] = Sink.foreach(output += _) + var errorOutput = "" + val stderr: Sink[String, Future[Done]] = Sink.foreach(errorOutput += _) + k8s.exec(nginxPodName, Seq("sh"), maybeStdin = Some(stdin), + maybeStdout = Some(stdout), maybeStderr = Some(stderr), tty = true, maybeClose = Some(closeAfter(1 second))).map { _ => + assert(output == "# whoami\r\nroot\r\n# ") + assert(errorOutput == "") + } + } + + it should "throw an exception without stdin, stdout nor stderr in the running pod" in { k8s => + k8s.exec(nginxPodName, Seq("whoami")).failed.map { + case e: K8SException => + assert(e.status.code == Some(400)) + } + } + + it should "throw an exception against an unexisting pod" in { k8s => + k8s.exec(nginxPodName + "x", Seq("whoami")).failed.map { + case e: K8SException => + assert(e.status.code == Some(404)) + } + } + + def closeAfter(duration: Duration) = { + val promise = Promise[Unit]() + Future { + Thread.sleep(duration.toMillis) + promise.success(()) + } + promise + } + + def getNginxContainer(version: String): Container = Container(name = "nginx", image = "nginx:" + version).exposePort(80) + + def getNginxPod(name: String, version: String): Pod = { + val nginxContainer = getNginxContainer(version) + val nginxPodSpec = Pod.Spec(containers = List((nginxContainer))) + Pod.named(nginxPodName).copy(spec = Some(nginxPodSpec)) + } +} diff --git a/client/src/main/scala/skuber/api/package.scala b/client/src/main/scala/skuber/api/package.scala index a09d35ec..3354472a 100644 --- a/client/src/main/scala/skuber/api/package.scala +++ b/client/src/main/scala/skuber/api/package.scala @@ -1,22 +1,23 @@ package skuber.api -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, Promise} import scala.sys.SystemProperties import scala.util.{Failure, Success, Try} import java.net.URL import java.time.Instant import java.util.UUID -import akka.NotUsed +import akka.{Done, NotUsed} import akka.actor.ActorSystem import akka.event.Logging import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.RawHeader import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} import akka.http.scaladsl.unmarshalling.Unmarshal import akka.http.scaladsl.{ConnectionContext, Http, HttpsConnectionContext} -import akka.stream.Materializer -import akka.stream.scaladsl.{Flow, Source} +import akka.stream.{Materializer, SinkShape} +import akka.stream.scaladsl.{Flow, GraphDSL, Keep, Partition, Sink, Source} import akka.util.ByteString import com.typesafe.config.{Config, ConfigFactory} import javax.net.ssl.SSLContext @@ -731,6 +732,103 @@ package object client { } yield apiVersionResource.versions } + def exec(podName: String, command: Seq[String], maybeContainerName: Option[String] = None, + maybeStdin: Option[Source[String, _]] = None, + maybeStdout: Option[Sink[String, _]] = None, + maybeStderr: Option[Sink[String, _]] = None, + tty: Boolean = false, + maybeClose: Option[Promise[Unit]] = None): Future[Unit] = { + val containerPrintName = maybeContainerName.getOrElse("") + log.info(s"Trying to connect to container ${containerPrintName} of pod ${podName}") + + // Compose queries + var queries: Seq[(String, String)] = Seq( + "stdin" -> maybeStdin.isDefined.toString, + "stdout" -> maybeStdout.isDefined.toString, + "stderr" -> maybeStderr.isDefined.toString, + "tty" -> tty.toString + ) + maybeContainerName.foreach { containerName => + queries ++= Seq("container" -> containerName) + } + queries ++= command.map("command" -> _) + + // Determine scheme + val scheme = sslContext match { + case Some(_) => + "wss" + case None => + "ws" + } + + // Compose URI + val uri = Uri(clusterServer) + .withScheme(scheme) + .withPath(Uri.Path(s"/api/v1/namespaces/$namespaceName/pods/${podName}/exec")) + .withQuery(Uri.Query(queries: _*)) + + // Compose headers + var headers: List[HttpHeader] = List(RawHeader("Accept", "*/*")) + headers ++= HTTPRequestAuth.getAuthHeaders(requestAuth) + + // Convert `String` to `ByteString`, then prepend channel bytes + val source: Source[ws.Message, Promise[Option[ws.Message]]] = maybeStdin.getOrElse(Source.empty).viaMat(Flow[String].map { s => + ws.BinaryMessage(ByteString(0).concat(ByteString(s))) + })(Keep.right).concatMat(Source.maybe[ws.Message])(Keep.right) + + // Split the sink from websocket into stdout and stderr then remove first bytes which indicate channels + val sink: Sink[ws.Message, NotUsed] = Sink.fromGraph(GraphDSL.create() { implicit builder => + import GraphDSL.Implicits._ + + val partition = builder.add(Partition[ws.Message](2, { + case bm: ws.BinaryMessage.Strict if bm.data(0) == 1 => + 0 + case bm: ws.BinaryMessage.Strict if bm.data(0) == 2 => + 1 + })) + + def convertSink = Flow[ws.Message].map[String] { + case bm: ws.BinaryMessage.Strict => + bm.data.utf8String.substring(1) + } + + partition.out(0) ~> convertSink ~> maybeStdout.getOrElse(Sink.ignore) + partition.out(1) ~> convertSink ~> maybeStderr.getOrElse(Sink.ignore) + + SinkShape(partition.in) + }) + + // Make a flow from the source to the sink + val flow: Flow[ws.Message, ws.Message, Promise[Option[ws.Message]]] = Flow.fromSinkAndSourceMat(sink, source)(Keep.right) + + // upgradeResponse completes or fails when the connection succeeds or fails + // and promise controls the connection close timing + val (upgradeResponse, promise) = Http().singleWebSocketRequest(ws.WebSocketRequest(uri, headers, subprotocol = Option("channel.k8s.io")), flow) + + val connected = upgradeResponse.map { upgrade => + // just like a regular http request we can access response status which is available via upgrade.response.status + // status code 101 (Switching Protocols) indicates that server support WebSockets + if (upgrade.response.status == StatusCodes.SwitchingProtocols) { + Done + } else { + val message = upgrade.response.entity.toStrict(1000.millis).map(_.data.utf8String) + throw new K8SException(Status(message = + Some(s"Connection failed with status ${upgrade.response.status}"), + details = Some(message), code = Some(upgrade.response.status.intValue()))) + } + } + + val close = maybeClose.getOrElse(Promise.successful(())) + connected.foreach { _ => + log.info(s"Connected to container ${containerPrintName} of pod ${podName}") + close.future.foreach { _ => + log.info(s"Close the connection of container ${containerPrintName} of pod ${podName}") + promise.success(None) + } + } + Future.sequence(Seq(connected, close.future, promise.future)).map { _ => () } + } + def close: Unit = { isClosed = true diff --git a/client/src/main/scala/skuber/api/security/HTTPRequestAuth.scala b/client/src/main/scala/skuber/api/security/HTTPRequestAuth.scala index bfcdbd52..805ff186 100644 --- a/client/src/main/scala/skuber/api/security/HTTPRequestAuth.scala +++ b/client/src/main/scala/skuber/api/security/HTTPRequestAuth.scala @@ -1,6 +1,6 @@ package skuber.api.security -import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.{HttpHeader, HttpRequest} import akka.http.scaladsl.model.headers.{Authorization, BasicHttpCredentials, OAuth2BearerToken} import skuber.api.client._ @@ -10,10 +10,18 @@ import skuber.api.client._ object HTTPRequestAuth { def addAuth(request: HttpRequest, auth: AuthInfo) : HttpRequest = { + getAuthHeaders(auth).foreach { header => + // Add headers one by one because `addHeaders()` doesn't convert the instance type + request.addHeader(header) + } + request + } + + def getAuthHeaders(auth: AuthInfo) : Seq[HttpHeader] = { auth match { - case NoAuth | _: CertAuth => request - case BasicAuth(user, password) => request.addHeader(Authorization(BasicHttpCredentials(user,password))) - case auth: AccessTokenAuth => request.addHeader(Authorization(OAuth2BearerToken(auth.accessToken))) + case NoAuth | _: CertAuth => Seq() + case BasicAuth(user, password) => Seq(Authorization(BasicHttpCredentials(user,password))) + case auth: AccessTokenAuth => Seq(Authorization(OAuth2BearerToken(auth.accessToken))) } } } diff --git a/examples/src/main/scala/skuber/examples/exec/ExecExamples.scala b/examples/src/main/scala/skuber/examples/exec/ExecExamples.scala new file mode 100644 index 00000000..a556639e --- /dev/null +++ b/examples/src/main/scala/skuber/examples/exec/ExecExamples.scala @@ -0,0 +1,87 @@ +package skuber.examples.exec + +import akka.{Done, NotUsed} +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Sink, Source} +import skuber._ + +import scala.concurrent.{Await, Future, Promise} +import scala.concurrent.duration.Duration.Inf +import skuber.json.format._ + +object ExecExamples extends App { + + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + implicit val dispatcher = system.dispatcher + + val k8s = k8sInit + k8s.logConfig + + println("Executing commands in pods ==>") + + val podName = "sleep" + val containerName = "sleep" + val sleepContainer = Container(name = containerName, image = "busybox", command = List("sh", "-c", "trap exit TERM; sleep 99999 & wait")) + val sleepPod = Pod(podName, Pod.Spec().addContainer(sleepContainer)) + + val terminalReady: Promise[Unit] = Promise() + + // Just print stdout and signal when the terminal gets ready + val sink: Sink[String, Future[Done]] = Sink.foreach { + case s => + print(s) + if (s.startsWith("/ #")) { + terminalReady.success(()) + } + } + + // Execute `ps aux` when the terminal gets ready + val source: Source[String, NotUsed] = Source.fromFuture(terminalReady.future.map { _ => + "ps aux\n" + }) + + // Wait for a while to ensure outputs + def close: Promise[Unit] = { + val promise = Promise[Unit]() + Future { + Thread.sleep(1000) + promise.success(()) + } + promise + } + + val fut = for { + // Create the sleep pod if not present + _ <- k8s.getOption[Pod](podName).flatMap { + case Some(pod) => Future.successful() + case None => + k8s.create(sleepPod).map { _ => + Thread.sleep(3000) + } + } + // Simulate kubectl exec + _ <- { + println("`kubectl exec ps aux`") + k8s.exec(podName, Seq("ps", "aux"), maybeStdout = Some(sink), maybeClose = Some(close)) + } + // Simulate kubectl exec -it + _ <- { + println("`kubectl -it exec sh` -> `ps aux`") + k8s.exec(podName, Seq("sh"), maybeStdout = Some(sink), maybeStdin = Some(source), tty = true, maybeClose = Some(close)) + } + } yield () + + // Clean up + fut.onComplete { _ => + println("\nFinishing up") + k8s.delete[Pod]("sleep") + k8s.close + system.terminate().foreach { f => + System.exit(0) + } + } + + Await.result(fut, Inf) +} From 30c7b99bdcd1ff0907cdcdc26d39164e5decfba6 Mon Sep 17 00:00:00 2001 From: Carsten Date: Thu, 6 Sep 2018 12:00:29 +0200 Subject: [PATCH 05/21] Use local HttpsConnectionContext instead of overwriting default (#205) --- client/src/main/scala/skuber/api/package.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/client/src/main/scala/skuber/api/package.scala b/client/src/main/scala/skuber/api/package.scala index 3354472a..7de8c75c 100644 --- a/client/src/main/scala/skuber/api/package.scala +++ b/client/src/main/scala/skuber/api/package.scala @@ -990,10 +990,12 @@ package object client { } val sslContext = TLS.establishSSLContext(k8sContext) - sslContext foreach { ssl => - val httpsContext = ConnectionContext.https(ssl, None,Some(scala.collection.immutable.Seq("TLSv1.2", "TLSv1")), None, None) - Http().setDefaultClientHttpsContext(httpsContext) - } + + val connectionContext = sslContext + .map { ssl => + ConnectionContext.https(ssl, enabledProtocols = Some(scala.collection.immutable.Seq("TLSv1.2", "TLSv1"))) + } + .getOrElse(Http().defaultClientHttpsContext) val theNamespaceName = k8sContext.namespace.name match { case "" => "default" @@ -1008,9 +1010,9 @@ package object client { val requestInvoker = (request: HttpRequest, watch: Boolean) => { if (!watch) - Http().singleRequest(request) + Http().singleRequest(request, connectionContext = connectionContext) else - Http().singleRequest(request, settings = watchSettings) + Http().singleRequest(request, settings = watchSettings, connectionContext = connectionContext) } new RequestContext( From 7737df7af858a175b95f87f0cb6ce249243f4d19 Mon Sep 17 00:00:00 2001 From: Peter Barron Date: Sun, 23 Sep 2018 16:01:36 +0100 Subject: [PATCH 06/21] Add support for PodDisruptionBudget (#213) * readyReplicas field on Status on Deployment V1 is misspelt * Add support for PodDisruptionBudget * Update IT tests to validate response * Remove scaler * Fix assertion on PodDisruptionBudget IT --- .../skuber/PodDisruptionBudgetSpec.scala | 76 ++++++++++++++++ .../policy/v1beta1/PodDisruptionBudget.scala | 86 +++++++++++++++++++ .../scala/skuber/policy/v1beta1/package.scala | 8 ++ .../resources/examplePodDisruptionBudget.json | 1 + .../v1beta1/PodDisruptionBudgetSpec.scala | 34 ++++++++ 5 files changed, 205 insertions(+) create mode 100644 client/src/it/scala/skuber/PodDisruptionBudgetSpec.scala create mode 100644 client/src/main/scala/skuber/policy/v1beta1/PodDisruptionBudget.scala create mode 100644 client/src/main/scala/skuber/policy/v1beta1/package.scala create mode 100644 client/src/test/resources/examplePodDisruptionBudget.json create mode 100644 client/src/test/scala/skuber/policy/v1beta1/PodDisruptionBudgetSpec.scala diff --git a/client/src/it/scala/skuber/PodDisruptionBudgetSpec.scala b/client/src/it/scala/skuber/PodDisruptionBudgetSpec.scala new file mode 100644 index 00000000..179a8b49 --- /dev/null +++ b/client/src/it/scala/skuber/PodDisruptionBudgetSpec.scala @@ -0,0 +1,76 @@ +package skuber + +import org.scalatest.Matchers +import org.scalatest.concurrent.Eventually +import skuber.apps.v1.Deployment +import skuber.policy.v1beta1.PodDisruptionBudget +import skuber.policy.v1beta1.PodDisruptionBudget._ + +class PodDisruptionBudgetSpec extends K8SFixture with Eventually with Matchers { + behavior of "PodDisruptionBudget" + + it should "create a PodDisruptionBudget" in { k8s => + val name: String = java.util.UUID.randomUUID().toString + k8s.create(getNginxDeployment(name, "1.7.9")) flatMap { d => + import LabelSelector.dsl._ + k8s.create(PodDisruptionBudget(name) + .withMinAvailable(Left(1)) + .withLabelSelector("app" is "nginx") + ).map { result => + assert(result.spec.contains(PodDisruptionBudget.Spec(None, Some(1), Some("app" is "nginx")))) + assert(result.name == name) + } + } + } + + it should "update a PodDisruptionBudget" in { k8s => + val name: String = java.util.UUID.randomUUID().toString + k8s.create(getNginxDeployment(name, "1.7.9")) flatMap { d => + import LabelSelector.dsl._ + k8s.create(PodDisruptionBudget(name) + .withMinAvailable(Left(1)) + .withLabelSelector("app" is "nginx") + ).flatMap(pdb => + eventually( + k8s.get[PodDisruptionBudget](pdb.name).flatMap { updatedPdb => + k8s.update(updatedPdb).map { result => //PodDisruptionBudget are immutable at the moment. + assert(result.spec.contains(PodDisruptionBudget.Spec(None, Some(1), Some("app" is "nginx")))) + assert(result.name == name) + } + } + ) + ) + } + } + + it should "delete a PodDisruptionBudget" in { k8s => + val name: String = java.util.UUID.randomUUID().toString + k8s.create(getNginxDeployment(name, "1.7.9")) flatMap { d => + import LabelSelector.dsl._ + k8s.create(PodDisruptionBudget(name) + .withMinAvailable(Left(1)) + .withLabelSelector("app" is "nginx") + ).flatMap { pdb => + k8s.delete[PodDisruptionBudget](pdb.name).flatMap { deleteResult => + k8s.get[PodDisruptionBudget](pdb.name).map { x => + assert(false) + } recoverWith { + case ex: K8SException if ex.status.code.contains(404) => assert(true) + case _ => assert(false) + } + } + } + } + } + + def getNginxDeployment(name: String, version: String): Deployment = { + import LabelSelector.dsl._ + val nginxContainer = getNginxContainer(version) + val nginxTemplate = Pod.Template.Spec.named("nginx").addContainer(nginxContainer).addLabel("app" -> "nginx") + Deployment(name).withTemplate(nginxTemplate).withLabelSelector("app" is "nginx") + } + + def getNginxContainer(version: String): Container = { + Container(name = "nginx", image = "nginx:" + version).exposePort(80) + } +} diff --git a/client/src/main/scala/skuber/policy/v1beta1/PodDisruptionBudget.scala b/client/src/main/scala/skuber/policy/v1beta1/PodDisruptionBudget.scala new file mode 100644 index 00000000..3f2e21a3 --- /dev/null +++ b/client/src/main/scala/skuber/policy/v1beta1/PodDisruptionBudget.scala @@ -0,0 +1,86 @@ +package skuber.policy.v1beta1 + +import skuber.ResourceSpecification.{Names, Scope} +import skuber.{IntOrString, LabelSelector, NonCoreResourceSpecification, ObjectMeta, ObjectResource, ResourceDefinition, Scale, Timestamp} + +case class PodDisruptionBudget(override val kind: String = "PodDisruptionBudget", + override val apiVersion: String = policyAPIVersion, + metadata: ObjectMeta, + spec: Option[PodDisruptionBudget.Spec] = None, + status: Option[PodDisruptionBudget.Status] = None) extends ObjectResource { + + private lazy val copySpec: PodDisruptionBudget.Spec = this.spec.getOrElse(PodDisruptionBudget.Spec(selector=Some(LabelSelector()))) + + def withLabelSelector(sel: LabelSelector): PodDisruptionBudget = { + this.copy(spec = Some(copySpec.copy(selector = Some(sel)))) + } + + def withMaxUnavailable(value: IntOrString): PodDisruptionBudget = { + this.copy(spec = Some(copySpec.copy(maxUnavailable = Some(value)))) + } + + def withMinAvailable(value: IntOrString): PodDisruptionBudget = { + this.copy(spec = Some(copySpec.copy(minAvailable = Some(value)))) + } +} + +object PodDisruptionBudget { + + def apply(name: String): PodDisruptionBudget = { + PodDisruptionBudget(metadata = ObjectMeta(name = name)) + } + + val specification = NonCoreResourceSpecification( + apiGroup = "policy", + version = "v1beta1", + scope = Scope.Namespaced, + names = Names( + plural = "poddisruptionbudgets", + singular = "poddisruptionbudget", + kind = "PodDisruptionBudget", + shortNames = List("pdb") + ) + ) + implicit val stsDef: ResourceDefinition[PodDisruptionBudget] = new ResourceDefinition[PodDisruptionBudget] { + def spec: NonCoreResourceSpecification = specification + } + implicit val stsListDef: ResourceDefinition[PodDisruptionBudgetList] = new ResourceDefinition[PodDisruptionBudgetList] { + def spec: NonCoreResourceSpecification = specification + } + + case class Spec(maxUnavailable: Option[IntOrString] = None, + minAvailable: Option[IntOrString] = None, + selector: Option[LabelSelector] = None) + + case class Status(currentHealthy: Int, + desiredHealthy: Int, + disruptedPods: Map[String, Timestamp], + disruptionsAllowed: Int, + expectedPods: Int, + observedGeneration: Option[Int]) + + import play.api.libs.functional.syntax._ + import play.api.libs.json.{Format, JsPath} + import skuber.json.format._ + + implicit val depStatusFmt: Format[Status] = ( + (JsPath \ "currentHealthy").formatMaybeEmptyInt() and + (JsPath \ "desiredHealthy").formatMaybeEmptyInt() and + (JsPath \ "disruptedPods").formatMaybeEmptyMap[Timestamp] and + (JsPath \ "disruptionsAllowed").formatMaybeEmptyInt() and + (JsPath \ "expectedPods").formatMaybeEmptyInt() and + (JsPath \ "observedGeneration").formatNullable[Int] + ) (Status.apply, unlift(Status.unapply)) + + implicit val depSpecFmt: Format[Spec] = ( + (JsPath \ "maxUnavailable").formatNullable[IntOrString] and + (JsPath \ "minAvailable").formatNullable[IntOrString] and + (JsPath \ "selector").formatNullableLabelSelector + ) (Spec.apply, unlift(Spec.unapply)) + + implicit lazy val pdbFormat: Format[PodDisruptionBudget] = ( + objFormat and + (JsPath \ "spec").formatNullable[Spec] and + (JsPath \ "status").formatNullable[Status] + )(PodDisruptionBudget.apply, unlift(PodDisruptionBudget.unapply)) +} \ No newline at end of file diff --git a/client/src/main/scala/skuber/policy/v1beta1/package.scala b/client/src/main/scala/skuber/policy/v1beta1/package.scala new file mode 100644 index 00000000..feda3f0c --- /dev/null +++ b/client/src/main/scala/skuber/policy/v1beta1/package.scala @@ -0,0 +1,8 @@ +package skuber.policy + +import skuber.ListResource + +package object v1beta1 { + val policyAPIVersion = "policy/v1beta1" + type PodDisruptionBudgetList = ListResource[skuber.policy.v1beta1.PodDisruptionBudget] +} diff --git a/client/src/test/resources/examplePodDisruptionBudget.json b/client/src/test/resources/examplePodDisruptionBudget.json new file mode 100644 index 00000000..8ea1118e --- /dev/null +++ b/client/src/test/resources/examplePodDisruptionBudget.json @@ -0,0 +1 @@ +{"kind":"PodDisruptionBudget","apiVersion":"policy/v1beta1","metadata":{"name":"someName"},"spec":{"maxUnavailable":2,"minAvailable":1,"selector":{"matchLabels":{"application":"someApplicationName"}}}} \ No newline at end of file diff --git a/client/src/test/scala/skuber/policy/v1beta1/PodDisruptionBudgetSpec.scala b/client/src/test/scala/skuber/policy/v1beta1/PodDisruptionBudgetSpec.scala new file mode 100644 index 00000000..29036955 --- /dev/null +++ b/client/src/test/scala/skuber/policy/v1beta1/PodDisruptionBudgetSpec.scala @@ -0,0 +1,34 @@ +package skuber.policy.v1beta1 + +import org.specs2.mutable.Specification +import play.api.libs.json.{JsSuccess, Json} +import skuber.LabelSelector.dsl._ + +class PodDisruptionBudgetSpec extends Specification { + import PodDisruptionBudget._ + "A PodDisruptionBudget can" >> { + "decoded from json" >> { + val pdb = PodDisruptionBudget("someName") + .withMaxUnavailable(Left(2)) + .withMinAvailable(Left(1)) + .withLabelSelector("application" is "someApplicationName") + + Json.parse(createJson("/examplePodDisruptionBudget.json")).validate[PodDisruptionBudget] mustEqual JsSuccess(pdb) + } + "encode to json" >> { + Json.stringify( + Json.toJson( + PodDisruptionBudget("someName") + .withMaxUnavailable(Left(2)) + .withMinAvailable(Left(1)) + .withLabelSelector("application" is "someApplicationName") + ) + ) mustEqual createJson("/examplePodDisruptionBudget.json") + } + } + + private def createJson(file: String): String = { + val source = scala.io.Source.fromURL(getClass.getResource(file)) + try source.mkString finally source.close() + } +} From 796c1e84e7e7ee540b7de159d8dc6b182e1d00b9 Mon Sep 17 00:00:00 2001 From: Peter Barron Date: Wed, 26 Sep 2018 11:24:26 +0100 Subject: [PATCH 07/21] Add missing list formatter for PodDisruptionBudget (#217) --- .../main/scala/skuber/policy/v1beta1/PodDisruptionBudget.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/src/main/scala/skuber/policy/v1beta1/PodDisruptionBudget.scala b/client/src/main/scala/skuber/policy/v1beta1/PodDisruptionBudget.scala index 3f2e21a3..a2829edb 100644 --- a/client/src/main/scala/skuber/policy/v1beta1/PodDisruptionBudget.scala +++ b/client/src/main/scala/skuber/policy/v1beta1/PodDisruptionBudget.scala @@ -83,4 +83,6 @@ object PodDisruptionBudget { (JsPath \ "spec").formatNullable[Spec] and (JsPath \ "status").formatNullable[Status] )(PodDisruptionBudget.apply, unlift(PodDisruptionBudget.unapply)) + + implicit val pdbListFormat: Format[PodDisruptionBudgetList] = ListResourceFormat[PodDisruptionBudget] } \ No newline at end of file From 7dc5a4564116d3ad5bdf7cbe9b29f55892c04f86 Mon Sep 17 00:00:00 2001 From: Daisuke Taniwaki Date: Wed, 26 Sep 2018 19:32:58 +0900 Subject: [PATCH 08/21] Handle Empty Values in Kube Config (#215) --- client/src/main/scala/skuber/api/Configuration.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/client/src/main/scala/skuber/api/Configuration.scala b/client/src/main/scala/skuber/api/Configuration.scala index 2a92d99e..c2b8370c 100644 --- a/client/src/main/scala/skuber/api/Configuration.scala +++ b/client/src/main/scala/skuber/api/Configuration.scala @@ -191,8 +191,12 @@ object Configuration { val k8sAuthInfoMap = topLevelYamlToK8SConfigMap("user", toK8SAuthInfo) def toK8SContext(contextConfig: YamlMap) = { - val cluster=contextConfig.asScala.get("cluster").flatMap(clusterName => k8sClusterMap.get(clusterName.asInstanceOf[String])).get - val authInfo =contextConfig.asScala.get("user").flatMap(userKey => k8sAuthInfoMap.get(userKey.asInstanceOf[String])).get + val cluster=contextConfig.asScala.get("cluster").filterNot(_ == "").map { clusterName => + k8sClusterMap.get(clusterName.asInstanceOf[String]).get + }.getOrElse(Cluster()) + val authInfo =contextConfig.asScala.get("user").filterNot(_ == "").map { userKey => + k8sAuthInfoMap.get(userKey.asInstanceOf[String]).get + }.getOrElse(NoAuth) val namespace=contextConfig.asScala.get("namespace").fold(Namespace.default) { name=>Namespace.forName(name.asInstanceOf[String]) } Context(cluster,authInfo,namespace) } From 97f1ee32333f1cd9fa11544ef19321377d2336ec Mon Sep 17 00:00:00 2001 From: Daisuke Taniwaki Date: Wed, 26 Sep 2018 19:33:37 +0900 Subject: [PATCH 09/21] Define implicit configmap list format (#216) --- client/src/main/scala/skuber/json/package.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/client/src/main/scala/skuber/json/package.scala b/client/src/main/scala/skuber/json/package.scala index fffa28e4..deefba9c 100644 --- a/client/src/main/scala/skuber/json/package.scala +++ b/client/src/main/scala/skuber/json/package.scala @@ -1050,6 +1050,7 @@ package object format { implicit val podListFmt: Format[PodList] = ListResourceFormat[Pod] implicit val nodeListFmt: Format[NodeList] = ListResourceFormat[Node] + implicit val configMapListFmt: Format[ConfigMapList] = ListResourceFormat[ConfigMap] implicit val serviceListFmt: Format[ServiceList] = ListResourceFormat[Service] implicit val endpointsListFmt: Format[EndpointsList] = ListResourceFormat[Endpoints] implicit val eventListFmt: Format[EventList] = ListResourceFormat[Event] From 8bbb9554366a12a9b29510656d8aeee6ff0c7b27 Mon Sep 17 00:00:00 2001 From: Peter Barron Date: Wed, 26 Sep 2018 11:35:38 +0100 Subject: [PATCH 10/21] Add support HorizontalPodAutoscaler v2beta1 (#214) * Add initial version of autoscaler v2beta1 * Add support for HorizontalPodAutoscaler v2beta1 * Fix complication issue * Add ITs for HorizontalPodAutoscalerV2Beta1 * Add missing list formatter --- .../HorizontalPodAutoscalerV2Beta1Spec.scala | 101 ++++++ .../v2beta1/HorizontalPodAutoscaler.scala | 318 ++++++++++++++++++ .../skuber/autoscaling/v2beta1/package.scala | 9 + .../exampleHorizontalPodAutoscaler.json | 1 + .../v2beta1/HorizontalPodAutoscalerSpec.scala | 115 +++++++ 5 files changed, 544 insertions(+) create mode 100644 client/src/it/scala/skuber/HorizontalPodAutoscalerV2Beta1Spec.scala create mode 100644 client/src/main/scala/skuber/autoscaling/v2beta1/HorizontalPodAutoscaler.scala create mode 100644 client/src/main/scala/skuber/autoscaling/v2beta1/package.scala create mode 100644 client/src/test/resources/exampleHorizontalPodAutoscaler.json create mode 100644 client/src/test/scala/skuber/autoscaling/v2beta1/HorizontalPodAutoscalerSpec.scala diff --git a/client/src/it/scala/skuber/HorizontalPodAutoscalerV2Beta1Spec.scala b/client/src/it/scala/skuber/HorizontalPodAutoscalerV2Beta1Spec.scala new file mode 100644 index 00000000..ac7ef322 --- /dev/null +++ b/client/src/it/scala/skuber/HorizontalPodAutoscalerV2Beta1Spec.scala @@ -0,0 +1,101 @@ +package skuber + +import org.scalatest.Matchers +import org.scalatest.concurrent.Eventually +import skuber.apps.v1.Deployment +import skuber.autoscaling.v2beta1.HorizontalPodAutoscaler +import skuber.autoscaling.v2beta1.HorizontalPodAutoscaler.ResourceMetricSource + +class HorizontalPodAutoscalerV2Beta1Spec extends K8SFixture with Eventually with Matchers { + behavior of "HorizontalPodAutoscalerV2Beta1" + + it should "create a HorizontalPodAutoscaler" in { k8s => + val name: String = java.util.UUID.randomUUID().toString + println(name) + k8s.create(getNginxDeployment(name, "1.7.9")) flatMap { d => + k8s.create( + HorizontalPodAutoscaler(name).withSpec( + HorizontalPodAutoscaler.Spec("v1", "Deployment", "nginx") + .withMinReplicas(1) + .withMaxReplicas(2) + .addResourceMetric(ResourceMetricSource(Resource.cpu, Some(80), None)) + ) + ).map { result => + assert(result.name == name) + assert(result.spec.contains( + HorizontalPodAutoscaler.Spec("v1", "Deployment", "nginx") + .withMinReplicas(1) + .withMaxReplicas(2) + .addResourceMetric(ResourceMetricSource(Resource.cpu, Some(80), None))) + ) + } + } + } + + it should "update a HorizontalPodAutoscaler" in { k8s => + val name: String = java.util.UUID.randomUUID().toString + k8s.create(getNginxDeployment(name, "1.7.9")) flatMap { d => + k8s.create( + HorizontalPodAutoscaler(name).withSpec( + HorizontalPodAutoscaler.Spec("v1", "Deployment", "nginx") + .withMinReplicas(1) + .withMaxReplicas(2) + .addResourceMetric(ResourceMetricSource(Resource.cpu, Some(80), None)) + ) + ).flatMap(created => + eventually( + k8s.get[HorizontalPodAutoscaler](created.name).flatMap { existing => + val udpated = existing.withSpec(HorizontalPodAutoscaler.Spec("v1", "Deployment", "nginx") + .withMinReplicas(1) + .withMaxReplicas(3) + .addResourceMetric(ResourceMetricSource(Resource.cpu, Some(80), None))) + + k8s.update(udpated).map { result => + assert(result.name == name) + assert(result.spec.contains( + HorizontalPodAutoscaler.Spec("v1", "Deployment", "nginx") + .withMinReplicas(1) + .withMaxReplicas(3) + .addResourceMetric(ResourceMetricSource(Resource.cpu, Some(80), None)) + )) + } + } + ) + ) + } + } + + it should "delete a HorizontalPodAutoscaler" in { k8s => + val name: String = java.util.UUID.randomUUID().toString + k8s.create(getNginxDeployment(name, "1.7.9")) flatMap { d => + k8s.create( + HorizontalPodAutoscaler(name).withSpec( + HorizontalPodAutoscaler.Spec("v1", "Deployment", "nginx") + .withMinReplicas(1) + .withMaxReplicas(2) + .addResourceMetric(ResourceMetricSource(Resource.cpu, Some(80), None)) + ) + ).flatMap { created => + k8s.delete[HorizontalPodAutoscaler](created.name).flatMap { deleteResult => + k8s.get[HorizontalPodAutoscaler](created.name).map { x => + assert(false) + } recoverWith { + case ex: K8SException if ex.status.code.contains(404) => assert(true) + case _ => assert(false) + } + } + } + } + } + + def getNginxDeployment(name: String, version: String): Deployment = { + import LabelSelector.dsl._ + val nginxContainer = getNginxContainer(version) + val nginxTemplate = Pod.Template.Spec.named("nginx").addContainer(nginxContainer).addLabel("app" -> "nginx") + Deployment(name).withTemplate(nginxTemplate).withLabelSelector("app" is "nginx") + } + + def getNginxContainer(version: String): Container = { + Container(name = "nginx", image = "nginx:" + version).exposePort(80) + } +} diff --git a/client/src/main/scala/skuber/autoscaling/v2beta1/HorizontalPodAutoscaler.scala b/client/src/main/scala/skuber/autoscaling/v2beta1/HorizontalPodAutoscaler.scala new file mode 100644 index 00000000..37632570 --- /dev/null +++ b/client/src/main/scala/skuber/autoscaling/v2beta1/HorizontalPodAutoscaler.scala @@ -0,0 +1,318 @@ +package skuber.autoscaling.v2beta1 + +import play.api.libs.json._ +import skuber.ResourceSpecification.{Names, Scope} +import skuber.apps.v1.{Deployment, DeploymentList} +import skuber.{LabelSelector, LimitRange, NonCoreResourceSpecification, ObjectMeta, ObjectResource, Resource, ResourceDefinition, Timestamp} + +case class HorizontalPodAutoscaler(override val kind: String = "HorizontalPodAutoscaler", + override val apiVersion: String = autoscalingAPIVersion, + metadata: ObjectMeta, + spec: Option[HorizontalPodAutoscaler.Spec] = None, + status: Option[HorizontalPodAutoscaler.Status] = None) extends ObjectResource { + + def withNamespace(namespace: String): HorizontalPodAutoscaler = { + this.copy(metadata = this.metadata.copy(namespace = namespace)) + } + + def withSpec(spec: HorizontalPodAutoscaler.Spec): HorizontalPodAutoscaler = { + this.copy(spec = Some(spec)) + } + + def withStatus(status: HorizontalPodAutoscaler.Status): HorizontalPodAutoscaler = { + this.copy(status = Some(status)) + } +} + +object HorizontalPodAutoscaler { + def apply(name: String): HorizontalPodAutoscaler = { + HorizontalPodAutoscaler(metadata = ObjectMeta(name = name)) + } + + val specification = NonCoreResourceSpecification( + apiGroup = "autoscaling", + version = "v2beta1", + scope = Scope.Namespaced, + names = Names( + plural = "horizontalpodautoscalers", + singular = "horizontalpodautoscaler", + kind = "HorizontalPodAutoscaler", + shortNames = List("hpa") + ) + ) + implicit val stsDef: ResourceDefinition[HorizontalPodAutoscaler] = new ResourceDefinition[HorizontalPodAutoscaler] { + def spec: NonCoreResourceSpecification = specification + } + + implicit val stsListDef: ResourceDefinition[HorizontalPodAutoscalerList] = new ResourceDefinition[HorizontalPodAutoscalerList] { + def spec: NonCoreResourceSpecification = specification + } + + object MetricsSourceType extends Enumeration { + type MetricsSourceType = Value + val Object, Pods, Resource, External = Value + } + + sealed trait Metric { + def `type`: MetricsSourceType.MetricsSourceType + } + + case class ObjectMetric(`object`: ObjectMetricSource) extends Metric { + val `type`: MetricsSourceType.MetricsSourceType = MetricsSourceType.Object + } + + case class PodsMetric(pods: PodsMetricSource) extends Metric { + val `type`: MetricsSourceType.MetricsSourceType = MetricsSourceType.Pods + } + + case class ResourceMetric(resource: ResourceMetricSource) extends Metric{ + val `type`: MetricsSourceType.MetricsSourceType = MetricsSourceType.Resource + } + + case class ExternalMetric(external: ExternalMetricSource) extends Metric{ + val `type`: MetricsSourceType.MetricsSourceType = MetricsSourceType.External + } + + case class ObjectMetricSource(target: CrossVersionObjectReference, + metricName: String, + targetValue: Resource.Quantity, + selector: Option[LabelSelector], + averageValue: Option[Resource.Quantity]) + + case class PodsMetricSource(metricName: String, + targetAverageValue: Resource.Quantity, + selector: Option[LabelSelector]) + + case class ResourceMetricSource(name: String, + targetAverageUtilization: Option[Int], + targetAverageValue: Option[Resource.Quantity]) + + case class ExternalMetricSource(metricName: String, + metricSelector: Option[LabelSelector], + targetValue: Option[Resource.Quantity], + targetAverageValue: Option[Resource.Quantity]) + + sealed trait MetricStatus { + def `type`: MetricsSourceType.MetricsSourceType + } + + case class ObjectMetricStatusHolder(`object`: ObjectMetricStatus) extends MetricStatus { + val `type`: MetricsSourceType.MetricsSourceType = MetricsSourceType.Object + } + + case class PodsMetricStatusHolder(pods: PodsMetricStatus) extends MetricStatus { + val `type`: MetricsSourceType.MetricsSourceType = MetricsSourceType.Pods + } + + case class ResourceMetricStatusHolder(resource: ResourceMetricStatus) extends MetricStatus { + val `type`: MetricsSourceType.MetricsSourceType = MetricsSourceType.Resource + } + + case class ExternalMetricStatusHolder(external: ExternalMetricStatus) extends MetricStatus { + val `type`: MetricsSourceType.MetricsSourceType = MetricsSourceType.External + } + + case class ObjectMetricStatus(target: CrossVersionObjectReference, + metricName: String, + currentValue: Resource.Quantity, + selector: Option[LabelSelector], + averageValue: Option[Resource.Quantity]) + + case class PodsMetricStatus(metricName: String, + currentAverageValue: Resource.Quantity, + selector: Option[LabelSelector]) + + case class ResourceMetricStatus(name: String, + currentAverageUtilization: Option[Int], + currentAverageValue: Option[Resource.Quantity]) + + case class ExternalMetricStatus(metricName: String, + metricSelector: Option[LabelSelector], + currentValue: Option[Resource.Quantity], + currentAverageValue: Option[Resource.Quantity]) + + case class CrossVersionObjectReference(apiVersion: String, + kind: String, + name: String) + + case class Condition(`type`: String, + status: String, + lastTransitionTime: Option[Timestamp], + reason: Option[String], + message: Option[String]) + + case class Status(observedGeneration:Option[Int], + lastScaleTime:Option[Timestamp], + currentReplicas: Int, + desiredReplicas: Int, + currentMetrics: List[MetricStatus], + conditions: List[Condition]) + + object Spec { + def apply(apiVersion: String, kind: String, name: String): Spec = { + new Spec(CrossVersionObjectReference(apiVersion, kind, name)) + } + } + + case class Spec(scaleTargetRef: CrossVersionObjectReference, + minReplicas: Option[Int] = Some(1), + maxReplicas: Int = 1, + metrics: List[Metric] = List()) { + + def addResourceMetric(metric: ResourceMetricSource): Spec = { + this.copy(metrics = this.metrics :+ ResourceMetric(metric)) + } + + def addPodMetric(metric: PodsMetricSource): Spec = { + this.copy(metrics = this.metrics :+ PodsMetric(metric)) + } + + def addObjectMetric(metric: ObjectMetricSource): Spec = { + this.copy(metrics = this.metrics :+ ObjectMetric(metric)) + } + + def addExternalMetric(metric: ExternalMetricSource): Spec = { + this.copy(metrics = this.metrics :+ ExternalMetric(metric)) + } + + def withMinReplicas(replicas: Int): Spec = { + this.copy(minReplicas = Some(replicas)) + } + + def withMaxReplicas(replicas: Int): Spec = { + this.copy(maxReplicas = replicas) + } + } + + import play.api.libs.functional.syntax._ + import play.api.libs.json.{Format, JsPath} + import skuber.json.format._ + + implicit val crossVersionObjectReferenceFmt: Format[CrossVersionObjectReference] = Json.format[CrossVersionObjectReference] + implicit val conditionFmt: Format[Condition] = Json.format[Condition] + implicit val limitRangeItemTypeFmt: Format[LimitRange.ItemType.Type] = enumFormat(LimitRange.ItemType) + implicit val metricsSourceTypeFmt: Format[MetricsSourceType.Value] = Format(enumReads(MetricsSourceType), enumWrites) + + implicit val resourceMetricStatusFmt: Format[ResourceMetricStatus] = Json.format[ResourceMetricStatus] + + implicit val objectMetricStatusFmt: Format[ObjectMetricStatus] = ( + (JsPath \ "target").format[CrossVersionObjectReference] and + (JsPath \ "metricName").format[String] and + (JsPath \ "currentValue").format[Resource.Quantity] and + (JsPath \ "selector").formatNullableLabelSelector and + (JsPath \ "averageValue").formatNullable[Resource.Quantity] + ) (ObjectMetricStatus.apply, unlift(ObjectMetricStatus.unapply)) + + implicit val podsMetricStatusFmt: Format[PodsMetricStatus] = ( + (JsPath \ "metricName").format[String] and + (JsPath \ "currentAverageValue").format[Resource.Quantity] and + (JsPath \ "selector").formatNullableLabelSelector + ) (PodsMetricStatus.apply, unlift(PodsMetricStatus.unapply)) + + implicit val externalMetricStatusFmt: Format[ExternalMetricStatus] = ( + (JsPath \ "metricName").format[String] and + (JsPath \ "metricSelector").formatNullableLabelSelector and + (JsPath \ "currentValue").formatNullable[Resource.Quantity] and + (JsPath \ "currentAverageValue").formatNullable[Resource.Quantity] + ) (ExternalMetricStatus.apply, unlift(ExternalMetricStatus.unapply)) + + + implicit val objectMetricStatusHolderFmt: Format[ObjectMetricStatusHolder] = Json.format[ObjectMetricStatusHolder] + implicit val podsMetricStatusHolderFmt: Format[PodsMetricStatusHolder] = Json.format[PodsMetricStatusHolder] + implicit val resourceMetricStatusHolderFmt: Format[ResourceMetricStatusHolder] = Json.format[ResourceMetricStatusHolder] + implicit val externalMetricStatusHolderFmt: Format[ExternalMetricStatusHolder] = Json.format[ExternalMetricStatusHolder] + + implicit val metricStatusWrite: Writes[MetricStatus] = Writes[MetricStatus] { + case s: PodsMetricStatusHolder => JsPath.write[PodsMetricStatusHolder](podsMetricStatusHolderFmt).writes(s) + ("type" -> JsString("Pods")) + case s: ObjectMetricStatusHolder => JsPath.write[ObjectMetricStatusHolder](objectMetricStatusHolderFmt).writes(s) + ("type" -> JsString("Object")) + case s: ResourceMetricStatusHolder => JsPath.write[ResourceMetricStatusHolder](resourceMetricStatusHolderFmt).writes(s) + ("type" -> JsString("Resource")) + case s: ExternalMetricStatusHolder => JsPath.write[ExternalMetricStatusHolder](externalMetricStatusHolderFmt).writes(s) + ("type" -> JsString("External")) + } + + implicit val metricStatusReads: Reads[MetricStatus] = new Reads[MetricStatus] { + override def reads(json: JsValue): JsResult[MetricStatus] = { + (json \ "type").as[String].toUpperCase match { + case "OBJECT" => JsSuccess(json.as[ObjectMetricStatusHolder]) + case "PODS" => JsSuccess(json.as[PodsMetricStatusHolder]) + case "RESOURCE" => JsSuccess(json.as[ResourceMetricStatusHolder]) + case "EXTERNAL" => JsSuccess(json.as[ExternalMetricStatusHolder]) + } + } + } + + implicit val metricStatusFormat: Format[MetricStatus] = Format(metricStatusReads, metricStatusWrite) + + implicit val depStatusFmt: Format[Status] = ( + (JsPath \ "observedGeneration").formatNullable[Int] and + (JsPath \ "lastScaleTime").formatNullable[Timestamp] and + (JsPath \ "currentReplicas").format[Int] and + (JsPath \ "desiredReplicas").format[Int] and + (JsPath \ "currentMetrics").formatMaybeEmptyList[MetricStatus] and + (JsPath \ "conditions").formatMaybeEmptyList[Condition] + ) (Status.apply, unlift(Status.unapply) + ) + + implicit val resourceMetricSourceFmt: Format[ResourceMetricSource] = Json.format[ResourceMetricSource] + + implicit val objectMetricSourceFmt: Format[ObjectMetricSource] = ( + (JsPath \ "target").format[CrossVersionObjectReference] and + (JsPath \ "metricName").format[String] and + (JsPath \ "targetValue").format[Resource.Quantity] and + (JsPath \ "selector").formatNullableLabelSelector and + (JsPath \ "averageValue").formatNullable[Resource.Quantity] + ) (ObjectMetricSource.apply, unlift(ObjectMetricSource.unapply)) + + implicit val podsMetricSourceFmt: Format[PodsMetricSource] = ( + (JsPath \ "metricName").format[String] and + (JsPath \ "targetAverageValue").format[Resource.Quantity] and + (JsPath \ "selector").formatNullableLabelSelector + ) (PodsMetricSource.apply, unlift(PodsMetricSource.unapply)) + + implicit val externalMetricSourceFmt: Format[ExternalMetricSource] = ( + (JsPath \ "metricName").format[String] and + (JsPath \ "metricSelector").formatNullableLabelSelector and + (JsPath \ "targetValue").formatNullable[Resource.Quantity] and + (JsPath \ "targetAverageValue").formatNullable[Resource.Quantity] + ) (ExternalMetricSource.apply, unlift(ExternalMetricSource.unapply)) + + + implicit val objectMetricFmt: Format[ObjectMetric] = Json.format[ObjectMetric] + implicit val podsMetricFmt: Format[PodsMetric] = Json.format[PodsMetric] + implicit val resourceMetricFmt: Format[ResourceMetric] = Json.format[ResourceMetric] + implicit val externalMetricFmt: Format[ExternalMetric] = Json.format[ExternalMetric] + + implicit val metricReads: Reads[Metric] = new Reads[Metric] { + override def reads(json: JsValue): JsResult[Metric] = { + (json \ "type").as[String].toUpperCase match { + case "OBJECT" => JsSuccess(json.as[ObjectMetric]) + case "PODS" => JsSuccess(json.as[PodsMetric]) + case "RESOURCE" => JsSuccess(json.as[ResourceMetric]) + case "EXTERNAL" => JsSuccess(json.as[ExternalMetric]) + } + } + } + + implicit val metricWrite: Writes[Metric] = Writes[Metric] { + case s: PodsMetric => JsPath.write[PodsMetric](podsMetricFmt).writes(s) + ("type" -> JsString("Pods")) + case s: ObjectMetric => JsPath.write[ObjectMetric](objectMetricFmt).writes(s) + ("type" -> JsString("Object")) + case s: ResourceMetric => JsPath.write[ResourceMetric](resourceMetricFmt).writes(s) + ("type" -> JsString("Resource")) + case s: ExternalMetric => JsPath.write[ExternalMetric](externalMetricFmt).writes(s) + ("type" -> JsString("External")) + } + + implicit val metricFormat: Format[Metric] = Format(metricReads, metricWrite) + + implicit val depSpecFmt: Format[Spec] = ( + (JsPath \ "scaleTargetRef").format[CrossVersionObjectReference] and + (JsPath \ "minReplicas").formatNullable[Int] and + (JsPath \ "maxReplicas").format[Int] and + (JsPath \ "metrics").formatMaybeEmptyList[Metric] + ) (Spec.apply, unlift(Spec.unapply)) + + implicit lazy val horizontalPodAutoscalerFormat: Format[HorizontalPodAutoscaler] = ( + objFormat and + (JsPath \ "spec").formatNullable[Spec] and + (JsPath \ "status").formatNullable[Status] + )(HorizontalPodAutoscaler.apply, unlift(HorizontalPodAutoscaler.unapply)) + + implicit val horizontalPodAutoscalerListFormat: Format[HorizontalPodAutoscalerList] = ListResourceFormat[HorizontalPodAutoscaler] +} \ No newline at end of file diff --git a/client/src/main/scala/skuber/autoscaling/v2beta1/package.scala b/client/src/main/scala/skuber/autoscaling/v2beta1/package.scala new file mode 100644 index 00000000..dd0eb0d7 --- /dev/null +++ b/client/src/main/scala/skuber/autoscaling/v2beta1/package.scala @@ -0,0 +1,9 @@ + +package skuber.autoscaling + +import skuber.ListResource + +package object v2beta1 { + val autoscalingAPIVersion = "autoscaling/v2beta1" + type HorizontalPodAutoscalerList = ListResource[skuber.autoscaling.v2beta1.HorizontalPodAutoscaler] +} diff --git a/client/src/test/resources/exampleHorizontalPodAutoscaler.json b/client/src/test/resources/exampleHorizontalPodAutoscaler.json new file mode 100644 index 00000000..72934528 --- /dev/null +++ b/client/src/test/resources/exampleHorizontalPodAutoscaler.json @@ -0,0 +1 @@ +{"kind":"HorizontalPodAutoscaler","apiVersion":"autoscaling/v2beta1","metadata":{"name":"someName","namespace":"someNamespace"},"spec":{"scaleTargetRef":{"apiVersion":"v2","kind":"Deployment","name":"someDeploymentName"},"minReplicas":2,"maxReplicas":4,"metrics":[{"object":{"target":{"apiVersion":"v2","kind":"Deployment","name":"someDeploymentName"},"metricName":"someObjectMetricName","targetValue":"1","selector":{"matchLabels":{"application":"someObjectapp"}},"averageValue":"2"},"type":"Object"},{"pods":{"metricName":"somePodsMetricName","targetAverageValue":"3","selector":{"matchLabels":{"application":"somePodsApp"}}},"type":"Pods"},{"resource":{"name":"someResourceName","targetAverageUtilization":10,"targetAverageValue":"4"},"type":"Resource"},{"external":{"metricName":"someExternalMetricsName","metricSelector":{"matchLabels":{"metrics":"someMetric"}},"targetValue":"5","targetAverageValue":"6"},"type":"External"}]},"status":{"observedGeneration":100,"lastScaleTime":"2018-01-01T12:30:00Z","currentReplicas":201,"desiredReplicas":202,"currentMetrics":[{"object":{"target":{"apiVersion":"v2","kind":"Deployment","name":"someDeploymentName"},"metricName":"someObjectMetricName","currentValue":"1","selector":{"matchLabels":{"application":"someObjectapp"}},"averageValue":"2"},"type":"Object"},{"pods":{"metricName":"somePodsMetricName","currentAverageValue":"3","selector":{"matchLabels":{"application":"somePodsApp"}}},"type":"Pods"},{"resource":{"name":"someResourceName","currentAverageUtilization":10,"currentAverageValue":"4"},"type":"Resource"},{"external":{"metricName":"someExternalMetricsName","metricSelector":{"matchLabels":{"metrics":"someMetric"}},"currentValue":"5","currentAverageValue":"6"},"type":"External"}],"conditions":[{"type":"someType","status":"someStatus","lastTransitionTime":"2017-01-01T12:30:00Z","reason":"someReason","message":"someMessage"}]}} \ No newline at end of file diff --git a/client/src/test/scala/skuber/autoscaling/v2beta1/HorizontalPodAutoscalerSpec.scala b/client/src/test/scala/skuber/autoscaling/v2beta1/HorizontalPodAutoscalerSpec.scala new file mode 100644 index 00000000..ba3c9e7e --- /dev/null +++ b/client/src/test/scala/skuber/autoscaling/v2beta1/HorizontalPodAutoscalerSpec.scala @@ -0,0 +1,115 @@ +package skuber.autoscaling.v2beta1 + +import java.time.{ZoneId, ZonedDateTime} + +import org.specs2.mutable.Specification +import play.api.libs.json.{JsSuccess, Json} +import skuber.{Resource, Timestamp} +import skuber.LabelSelector.dsl._ + +class HorizontalPodAutoscalerSpec extends Specification { + + import HorizontalPodAutoscaler._ + + val lastScaleTime: Timestamp = ZonedDateTime.of(2018, 1, 1, 12, 30, 0, 0, ZoneId.of("Z")) + val conditionTime: Timestamp = ZonedDateTime.of(2017, 1, 1, 12, 30, 0, 0, ZoneId.of("Z")) + + "A HorizontalPodAutoscaler can" >> { + "decoded from json" >> { + Json.parse(createJson("/exampleHorizontalPodAutoscaler.json")).validate[HorizontalPodAutoscaler] mustEqual JsSuccess(hpa) + } + + "encode to json" >> { + Json.stringify( + Json.toJson(hpa) + ) mustEqual createJson("/exampleHorizontalPodAutoscaler.json") + } + } + + val hpa: HorizontalPodAutoscaler = HorizontalPodAutoscaler("someName").withNamespace("someNamespace").withSpec( + HorizontalPodAutoscaler.Spec("v2", "Deployment", "someDeploymentName") + .withMinReplicas(2) + .withMaxReplicas(4).addObjectMetric( + ObjectMetricSource( + CrossVersionObjectReference("v2", "Deployment", "someDeploymentName"), + "someObjectMetricName", + Resource.Quantity("1"), + Some("application" is "someObjectapp"), + Some(Resource.Quantity("2")) + ) + ).addPodMetric( + PodsMetricSource( + "somePodsMetricName", + Resource.Quantity("3"), + Some("application" is "somePodsApp") + ) + ).addResourceMetric( + ResourceMetricSource( + "someResourceName", + Some(10), + Some(Resource.Quantity("4")) + ) + ).addExternalMetric( + ExternalMetricSource( + "someExternalMetricsName", + Some("metrics" is "someMetric"), + Some(Resource.Quantity("5")), + Some(Resource.Quantity("6")) + ) + ) + ).withStatus( + HorizontalPodAutoscaler.Status( + Some(100), + Some(lastScaleTime), + 201, + 202, + List( + ObjectMetricStatusHolder( + ObjectMetricStatus( + CrossVersionObjectReference("v2", "Deployment", "someDeploymentName"), + "someObjectMetricName", + Resource.Quantity("1"), + Some("application" is "someObjectapp"), + Some(Resource.Quantity("2")) + ) + ), + PodsMetricStatusHolder( + PodsMetricStatus( + "somePodsMetricName", + Resource.Quantity("3"), + Some("application" is "somePodsApp") + ) + ), + ResourceMetricStatusHolder( + ResourceMetricStatus( + "someResourceName", + Some(10), + Some(Resource.Quantity("4")) + ) + ), + ExternalMetricStatusHolder( + ExternalMetricStatus( + "someExternalMetricsName", + Some("metrics" is "someMetric"), + Some(Resource.Quantity("5")), + Some(Resource.Quantity("6")) + ) + ) + ), + List( + Condition( + "someType", + "someStatus", + Some(conditionTime), + Some("someReason"), + Some("someMessage") + ) + ) + ) + ) + + private def createJson(file: String): String = { + val source = scala.io.Source.fromURL(getClass.getResource(file)) + try source.mkString finally source.close() + } +} From 86c961aa4c8fed9593b91d9a512478ad9c664236 Mon Sep 17 00:00:00 2001 From: Peter Barron Date: Thu, 27 Sep 2018 07:15:10 +0100 Subject: [PATCH 11/21] Fix auth header (#219) --- client/src/main/scala/skuber/api/package.scala | 2 +- .../skuber/api/security/HTTPRequestAuth.scala | 14 +++++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/client/src/main/scala/skuber/api/package.scala b/client/src/main/scala/skuber/api/package.scala index 7de8c75c..206d1250 100644 --- a/client/src/main/scala/skuber/api/package.scala +++ b/client/src/main/scala/skuber/api/package.scala @@ -769,7 +769,7 @@ package object client { // Compose headers var headers: List[HttpHeader] = List(RawHeader("Accept", "*/*")) - headers ++= HTTPRequestAuth.getAuthHeaders(requestAuth) + headers ++= HTTPRequestAuth.getAuthHeader(requestAuth).map(a => List(a)).getOrElse(List()) // Convert `String` to `ByteString`, then prepend channel bytes val source: Source[ws.Message, Promise[Option[ws.Message]]] = maybeStdin.getOrElse(Source.empty).viaMat(Flow[String].map { s => diff --git a/client/src/main/scala/skuber/api/security/HTTPRequestAuth.scala b/client/src/main/scala/skuber/api/security/HTTPRequestAuth.scala index 805ff186..f0a1ec71 100644 --- a/client/src/main/scala/skuber/api/security/HTTPRequestAuth.scala +++ b/client/src/main/scala/skuber/api/security/HTTPRequestAuth.scala @@ -10,18 +10,14 @@ import skuber.api.client._ object HTTPRequestAuth { def addAuth(request: HttpRequest, auth: AuthInfo) : HttpRequest = { - getAuthHeaders(auth).foreach { header => - // Add headers one by one because `addHeaders()` doesn't convert the instance type - request.addHeader(header) - } - request + getAuthHeader(auth).map(request.addHeader).getOrElse(request) } - def getAuthHeaders(auth: AuthInfo) : Seq[HttpHeader] = { + def getAuthHeader(auth: AuthInfo) : Option[HttpHeader] = { auth match { - case NoAuth | _: CertAuth => Seq() - case BasicAuth(user, password) => Seq(Authorization(BasicHttpCredentials(user,password))) - case auth: AccessTokenAuth => Seq(Authorization(OAuth2BearerToken(auth.accessToken))) + case NoAuth | _: CertAuth => None + case BasicAuth(user, password) => Some(Authorization(BasicHttpCredentials(user,password))) + case auth: AccessTokenAuth => Some(Authorization(OAuth2BearerToken(auth.accessToken))) } } } From 7d1fbe7775d9e04e0cc84df9825fcfc637eb230e Mon Sep 17 00:00:00 2001 From: David O'Riordan Date: Thu, 27 Sep 2018 14:25:20 +0100 Subject: [PATCH 12/21] Set connection context in exec method (#222) --- client/src/main/scala/skuber/api/package.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/client/src/main/scala/skuber/api/package.scala b/client/src/main/scala/skuber/api/package.scala index 206d1250..76bdceb9 100644 --- a/client/src/main/scala/skuber/api/package.scala +++ b/client/src/main/scala/skuber/api/package.scala @@ -753,12 +753,12 @@ package object client { } queries ++= command.map("command" -> _) - // Determine scheme - val scheme = sslContext match { - case Some(_) => - "wss" + // Determine scheme and connection context based on SSL context + val (scheme, connectionContext) = sslContext match { + case Some(ssl) => + ("wss",ConnectionContext.https(ssl, enabledProtocols = Some(scala.collection.immutable.Seq("TLSv1.2", "TLSv1")))) case None => - "ws" + ("ws", Http().defaultClientHttpsContext) } // Compose URI @@ -798,12 +798,13 @@ package object client { SinkShape(partition.in) }) + // Make a flow from the source to the sink val flow: Flow[ws.Message, ws.Message, Promise[Option[ws.Message]]] = Flow.fromSinkAndSourceMat(sink, source)(Keep.right) // upgradeResponse completes or fails when the connection succeeds or fails // and promise controls the connection close timing - val (upgradeResponse, promise) = Http().singleWebSocketRequest(ws.WebSocketRequest(uri, headers, subprotocol = Option("channel.k8s.io")), flow) + val (upgradeResponse, promise) = Http().singleWebSocketRequest(ws.WebSocketRequest(uri, headers, subprotocol = Option("channel.k8s.io")), flow, connectionContext) val connected = upgradeResponse.map { upgrade => // just like a regular http request we can access response status which is available via upgrade.response.status From c2abd813060b4a815b32bec54262bb32ba9c6827 Mon Sep 17 00:00:00 2001 From: David O'Riordan Date: Fri, 28 Sep 2018 08:47:22 +0100 Subject: [PATCH 13/21] v2.0.11 release (#224) --- README.md | 2 +- build.sbt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index e205fd40..fb181229 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ Provides you with a configured client on startup. It is handy to use this for qu > Just handy shortcut to import skuber inside ammonite-repl: ```scala - import $ivy.`io.skuber::skuber:2.0.10`, skuber._, skuber.json.format._ + import $ivy.`io.skuber::skuber:2.0.11`, skuber._, skuber.json.format._ ``` ### Interactive with sbt diff --git a/build.sbt b/build.sbt index 3db4ba83..ea095c0a 100644 --- a/build.sbt +++ b/build.sbt @@ -29,7 +29,7 @@ scalacOptions += "-target:jvm-1.8" scalacOptions in Test ++= Seq("-Yrangepos") -version in ThisBuild := "2.0.10" +version in ThisBuild := "2.0.11" sonatypeProfileName := "io.skuber" From 2d7824dee9c4cda052e4bf0afdc5870acdc3ed06 Mon Sep 17 00:00:00 2001 From: David O'Riordan Date: Fri, 28 Sep 2018 09:15:04 +0100 Subject: [PATCH 14/21] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index fb181229..1b8340b6 100644 --- a/README.md +++ b/README.md @@ -119,7 +119,7 @@ To get minikube follow the instructions [here](https://github.com/kubernetes/min You can use the latest release (for Scala 2.11 or 2.12) by adding to your build: ```sbt -libraryDependencies += "io.skuber" %% "skuber" % "2.0.10" +libraryDependencies += "io.skuber" %% "skuber" % "2.0.11" ``` Meanwhile users of skuber v1 can continue to use the latest (and possibly final, with exception of important fixes) v1.x release, which is available only on Scala 2.11: From f3da00a3ddff72d40ae3cc1832c8670b92c9ca12 Mon Sep 17 00:00:00 2001 From: David O'Riordan Date: Sat, 29 Sep 2018 08:09:23 +0100 Subject: [PATCH 15/21] Fixes issue #225 (getPodLogSource returning 400 when >1 containers) (#226) --- client/src/main/scala/skuber/Pod.scala | 2 +- .../skuber/examples/podlogs/PodLogsExample.scala | 16 ++++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/client/src/main/scala/skuber/Pod.scala b/client/src/main/scala/skuber/Pod.scala index 85efbc3d..72caa05d 100644 --- a/client/src/main/scala/skuber/Pod.scala +++ b/client/src/main/scala/skuber/Pod.scala @@ -259,7 +259,7 @@ object Pod { timestamps: Option[Boolean] = None) { lazy val asOptionalsMap: Map[String, Option[String]] = Map( - "containerName" -> containerName, + "container" -> containerName, "follow" -> follow.map(_.toString), "limitBytes" -> limitBytes.map(_.toString), "pretty" -> pretty.map(_.toString), diff --git a/examples/src/main/scala/skuber/examples/podlogs/PodLogsExample.scala b/examples/src/main/scala/skuber/examples/podlogs/PodLogsExample.scala index 768c7e9a..b05dd6b3 100644 --- a/examples/src/main/scala/skuber/examples/podlogs/PodLogsExample.scala +++ b/examples/src/main/scala/skuber/examples/podlogs/PodLogsExample.scala @@ -19,13 +19,13 @@ import scala.concurrent.duration._ */ object PodLogExample extends App { - val printLogFlow: Sink[ByteString, NotUsed] = Flow[ByteString] + def printLogFlow(cntrName: String): Sink[ByteString, NotUsed] = Flow[ByteString] .via(Framing.delimiter( ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) .map(_.utf8String) - .to(Sink.foreach(text => println(s"[hello-world logs] $text"))) + .to(Sink.foreach(text => println(s"[${cntrName} logs] $text"))) implicit val system = ActorSystem() @@ -36,16 +36,20 @@ object PodLogExample extends App { client.LoggingConfig(logRequestBasic = false, logResponseBasic = false) ) val helloWorldContainer=Container(name="hello-world", image="busybox", command=List("sh", "-c", "echo Hello World! && echo Goodbye World && sleep 60")) - val helloWorldPod=Pod("hello-world", Pod.Spec().addContainer(helloWorldContainer)) + val helloWorldContainer2=Container(name="hello-world2", image="busybox", command=List("sh", "-c", "echo Hello World again! && echo Goodbye World again && sleep 60")) + val helloWorldPod=Pod("hello-world", Pod.Spec().addContainer(helloWorldContainer).addContainer(helloWorldContainer2)) val podFut = k8s.create(helloWorldPod) + println("Waiting 30 seconds to allow pod initialisation to complete before getting logs...") Thread.sleep(30000) for { pod <- podFut - logsSource <- k8s.getPodLogSource("hello-world", Pod.LogQueryParams()) - donePrinting = logsSource.runWith(printLogFlow) - } yield donePrinting + logsSource <- k8s.getPodLogSource("hello-world", Pod.LogQueryParams(containerName = Some("hello-world"))) + logsSource1 <- k8s.getPodLogSource("hello-world", Pod.LogQueryParams(containerName = Some("hello-world2"))) + donePrinting = logsSource.runWith(printLogFlow("hello-world")) + donePrinting1 = logsSource1.runWith(printLogFlow("hello-world2")) + } yield (donePrinting, donePrinting1) // allow another 5 seconds for logs to be streamed from the pod to stdout before cleaning up Thread.sleep(5000) From 6a57f46f4a9f5b8f648ed7290837b0f1be6d523c Mon Sep 17 00:00:00 2001 From: Daisuke Taniwaki Date: Wed, 3 Oct 2018 14:00:59 +0900 Subject: [PATCH 16/21] Format size limit of EmptyDir (#228) --- .../src/main/scala/skuber/json/package.scala | 33 +++++++++---------- .../scala/skuber/json/VolumeFormatSpec.scala | 16 +++++---- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/client/src/main/scala/skuber/json/package.scala b/client/src/main/scala/skuber/json/package.scala index deefba9c..f55e5b68 100644 --- a/client/src/main/scala/skuber/json/package.scala +++ b/client/src/main/scala/skuber/json/package.scala @@ -472,23 +472,22 @@ package object format { implicit val lifecycleFormat: Format[Lifecycle] = Json.format[Lifecycle] import Volume._ - - implicit val emptyDirReads: Reads[EmptyDir] = { - (JsPath \ "medium").readNullable[String].map { - case Some(med) if med == "Memory" => EmptyDir(MemoryStorageMedium) - case Some(med) if med == "HugePages" => EmptyDir(HugePagesStorageMedium) - case _ => EmptyDir(DefaultStorageMedium) - } - } - implicit val emptyDirWrites: Writes[EmptyDir] = Writes[EmptyDir] { - ed => ed.medium match { - case DefaultStorageMedium => (JsPath \ "medium").write[String].writes("") - case MemoryStorageMedium => (JsPath \ "medium").write[String].writes("Memory") - case HugePagesStorageMedium => (JsPath \ "medium").write[String].writes("HugePages") - } - } - implicit val emptyDirFormat: Format[EmptyDir] = Format(emptyDirReads, emptyDirWrites) - + + implicit val storageMediumFormat: Format[StorageMedium] = Format[StorageMedium](Reads[StorageMedium] { + case JsString(med) if med == "Memory" => JsSuccess(MemoryStorageMedium) + case JsString(med) if med == "HugePages" => JsSuccess(HugePagesStorageMedium) + case _ => JsSuccess(DefaultStorageMedium) + }, Writes[StorageMedium] { + case DefaultStorageMedium => JsString("") + case MemoryStorageMedium => JsString("Memory") + case HugePagesStorageMedium => JsString("HugePages") + }) + + implicit val emptyDirFormat: Format[EmptyDir] = ( + (JsPath \ "medium").formatWithDefault[StorageMedium](DefaultStorageMedium) and + (JsPath \ "sizeLimit").formatNullable[Resource.Quantity] + )(EmptyDir.apply _, unlift(EmptyDir.unapply)) + implicit val hostPathFormat: Format[HostPath] = Json.format[HostPath] implicit val keyToPathFormat: Format[KeyToPath] = Json.format[KeyToPath] implicit val volumeSecretFormat: Format[skuber.Volume.Secret] = Json.format[skuber.Volume.Secret] diff --git a/client/src/test/scala/skuber/json/VolumeFormatSpec.scala b/client/src/test/scala/skuber/json/VolumeFormatSpec.scala index c067d8c7..ba38523f 100644 --- a/client/src/test/scala/skuber/json/VolumeFormatSpec.scala +++ b/client/src/test/scala/skuber/json/VolumeFormatSpec.scala @@ -55,15 +55,19 @@ class VolumeReadWriteSpec extends Specification { // Volume reader and writer "A Volume spec can be symmetrically written to json and the same value read back in\n" >> { "this can be done for the emptydir type source spec" >> { - val edVol = Volume("myVol", Volume.EmptyDir()) + val edVol = Volume("myVol", Volume.EmptyDir( + Volume.HugePagesStorageMedium, + sizeLimit = Some(Resource.Quantity("100M")))) val myVolJson = Json.toJson(edVol) val readVol = Json.fromJson[Volume](myVolJson).get readVol.name mustEqual "myVol" - readVol.source match { - case Volume.EmptyDir(medium,_) => medium mustEqual Volume.DefaultStorageMedium - case _ => Failure("not an emptyDir!") - } - readVol.source mustEqual Volume.EmptyDir() + readVol.source mustEqual Volume.EmptyDir(Volume.HugePagesStorageMedium, Some(Resource.Quantity("100M"))) + + // Ensure empty EmptyDir is still deserizeable + val emptyEmptyDirJson = JsObject.empty + val readEmptyDir = Json.fromJson[Volume.EmptyDir](emptyEmptyDirJson).get + readEmptyDir.medium mustEqual Volume.DefaultStorageMedium + readEmptyDir.sizeLimit mustEqual None } "this can be done for the a hostpath type source" >> { From ecb2bc4149c6b0c5eefa7e62db608c0228b29b3b Mon Sep 17 00:00:00 2001 From: Daisuke Taniwaki Date: Thu, 18 Oct 2018 19:21:03 +0900 Subject: [PATCH 17/21] Add sinceTime option for pod logs (#232) --- client/src/main/scala/skuber/Pod.scala | 4 ++++ .../main/scala/skuber/examples/podlogs/PodLogsExample.scala | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/client/src/main/scala/skuber/Pod.scala b/client/src/main/scala/skuber/Pod.scala index 72caa05d..5720ffb6 100644 --- a/client/src/main/scala/skuber/Pod.scala +++ b/client/src/main/scala/skuber/Pod.scala @@ -1,5 +1,7 @@ package skuber +import java.time.format.DateTimeFormatter + /** * @author David O'Riordan */ @@ -255,6 +257,7 @@ object Pod { pretty: Option[Boolean] = None, previous: Option[Boolean] = None, sinceSeconds: Option[Int] = None, + sinceTime: Option[Timestamp] = None, tailLines: Option[Int] = None, timestamps: Option[Boolean] = None) { @@ -265,6 +268,7 @@ object Pod { "pretty" -> pretty.map(_.toString), "previous" -> previous.map(_.toString), "sinceSeconds" -> sinceSeconds.map(_.toString), + "sinceTime" -> sinceTime.map(_.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)), "tailLines" -> tailLines.map(_.toString), "timestamps" -> timestamps.map(_.toString)) diff --git a/examples/src/main/scala/skuber/examples/podlogs/PodLogsExample.scala b/examples/src/main/scala/skuber/examples/podlogs/PodLogsExample.scala index b05dd6b3..589873d3 100644 --- a/examples/src/main/scala/skuber/examples/podlogs/PodLogsExample.scala +++ b/examples/src/main/scala/skuber/examples/podlogs/PodLogsExample.scala @@ -45,8 +45,8 @@ object PodLogExample extends App { Thread.sleep(30000) for { pod <- podFut - logsSource <- k8s.getPodLogSource("hello-world", Pod.LogQueryParams(containerName = Some("hello-world"))) - logsSource1 <- k8s.getPodLogSource("hello-world", Pod.LogQueryParams(containerName = Some("hello-world2"))) + logsSource <- k8s.getPodLogSource("hello-world", Pod.LogQueryParams(containerName = Some("hello-world"), sinceSeconds = Some(9999999))) + logsSource1 <- k8s.getPodLogSource("hello-world", Pod.LogQueryParams(containerName = Some("hello-world2"), sinceTime = pod.metadata.creationTimestamp)) donePrinting = logsSource.runWith(printLogFlow("hello-world")) donePrinting1 = logsSource1.runWith(printLogFlow("hello-world2")) } yield (donePrinting, donePrinting1) From 027aea85f03df5d7216d80d426c67e51b63112c6 Mon Sep 17 00:00:00 2001 From: Daisuke Taniwaki Date: Tue, 23 Oct 2018 22:16:06 +0900 Subject: [PATCH 18/21] Implement the patch method (#227) * Implement the patch method * Move patch objects to the dedicated package * Refactor patch strategies --- client/src/it/scala/skuber/PatchSpec.scala | 118 ++++++++++++++++++ .../src/main/scala/skuber/api/package.scala | 26 ++++ client/src/main/scala/skuber/api/patch.scala | 77 ++++++++++++ .../src/main/scala/skuber/json/package.scala | 36 +++++- client/src/main/scala/skuber/package.scala | 3 +- 5 files changed, 258 insertions(+), 2 deletions(-) create mode 100644 client/src/it/scala/skuber/PatchSpec.scala create mode 100644 client/src/main/scala/skuber/api/patch.scala diff --git a/client/src/it/scala/skuber/PatchSpec.scala b/client/src/it/scala/skuber/PatchSpec.scala new file mode 100644 index 00000000..f2d8f1f5 --- /dev/null +++ b/client/src/it/scala/skuber/PatchSpec.scala @@ -0,0 +1,118 @@ +package skuber + +import org.scalatest.{BeforeAndAfterAll, Matchers} +import org.scalatest.concurrent.Eventually +import skuber.api.patch._ +import skuber.json.format._ + +import scala.concurrent.duration._ +import scala.concurrent.Await +import scala.util.Success + + +class PatchSpec extends K8SFixture with Eventually with Matchers with BeforeAndAfterAll { + val nginxPodName: String = java.util.UUID.randomUUID().toString + + override def beforeAll(): Unit = { + super.beforeAll() + + val k8s = k8sInit + Await.result(k8s.create(getNginxPod(nginxPodName, "1.7.9")), 3 second) + // Let the pod running + Thread.sleep(3000) + k8s.close + } + + override def afterAll(): Unit = { + val k8s = k8sInit + Await.result(k8s.delete[Pod](nginxPodName), 3 second) + Thread.sleep(3000) + k8s.close + + super.afterAll() + } + + behavior of "Patch" + + it should "patch a pod with strategic merge patch by default" in { k8s => + val randomString = java.util.UUID.randomUUID().toString + val patchData = MetadataPatch(labels = Some(Map("foo" -> randomString)), annotations = None) + k8s.patch[MetadataPatch, Pod](nginxPodName, patchData).map { _ => + eventually(timeout(10 seconds), interval(1 seconds)) { + val retrievePod = k8s.get[Pod](nginxPodName) + val podRetrieved = Await.ready(retrievePod, 2 seconds).value.get + podRetrieved match { + case Success(pod: Pod) => + assert(pod.metadata.labels == Map("label" -> "1", "foo" -> randomString)) + assert(pod.metadata.annotations == Map()) + case _ => assert(false) + } + } + } + } + + it should "patch a pod with strategic merge patch" in { k8s => + val randomString = java.util.UUID.randomUUID().toString + val patchData = new MetadataPatch(labels = Some(Map("foo" -> randomString)), annotations = None, strategy = StrategicMergePatchStrategy) + k8s.patch[MetadataPatch, Pod](nginxPodName, patchData).map { _ => + eventually(timeout(10 seconds), interval(1 seconds)) { + val retrievePod = k8s.get[Pod](nginxPodName) + val podRetrieved = Await.ready(retrievePod, 2 seconds).value.get + podRetrieved match { + case Success(pod: Pod) => + assert(pod.metadata.labels == Map("label" -> "1", "foo" -> randomString)) + assert(pod.metadata.annotations == Map()) + case _ => assert(false) + } + } + } + } + + it should "patch a pod with json merge patch" in { k8s => + val randomString = java.util.UUID.randomUUID().toString + val patchData = new MetadataPatch(labels = Some(Map("foo" -> randomString)), annotations = None, strategy = JsonMergePatchStrategy) + k8s.patch[MetadataPatch, Pod](nginxPodName, patchData).map { _ => + eventually(timeout(10 seconds), interval(1 seconds)) { + val retrievePod = k8s.get[Pod](nginxPodName) + val podRetrieved = Await.ready(retrievePod, 2 seconds).value.get + podRetrieved match { + case Success(pod: Pod) => + assert(pod.metadata.labels == Map("label" -> "1", "foo" -> randomString)) + assert(pod.metadata.annotations == Map()) + case _ => assert(false) + } + } + } + } + + it should "patch a pod with json patch" in { k8s => + val randomString = java.util.UUID.randomUUID().toString + val annotations = Map("skuber" -> "wow") + val patchData = JsonPatchOperationList(List( + JsonPatchOperation.Add("/metadata/labels/foo", randomString), + JsonPatchOperation.Add("/metadata/annotations", randomString), + JsonPatchOperation.Remove("/metadata/annotations"), + )) + k8s.patch[JsonPatchOperationList, Pod](nginxPodName, patchData).map { _ => + eventually(timeout(10 seconds), interval(1 seconds)) { + val retrievePod = k8s.get[Pod](nginxPodName) + val podRetrieved = Await.ready(retrievePod, 2 seconds).value.get + podRetrieved match { + case Success(pod: Pod) => + assert(pod.metadata.labels == Map("label" -> "1", "foo" -> randomString)) + assert(pod.metadata.annotations == Map()) + case _ => assert(false) + } + } + } + } + + def getNginxContainer(version: String): Container = Container(name = "nginx", image = "nginx:" + version).exposePort(80) + + def getNginxPod(name: String, version: String): Pod = { + val nginxContainer = getNginxContainer(version) + val nginxPodSpec = Pod.Spec(containers = List((nginxContainer))) + Pod(metadata = ObjectMeta(nginxPodName, labels = Map("label" -> "1"), annotations = Map("annotation" -> "1")) + , spec = Some(nginxPodSpec)) + } +} diff --git a/client/src/main/scala/skuber/api/package.scala b/client/src/main/scala/skuber/api/package.scala index 76bdceb9..e00a8c67 100644 --- a/client/src/main/scala/skuber/api/package.scala +++ b/client/src/main/scala/skuber/api/package.scala @@ -30,6 +30,7 @@ import skuber.json.format._ import skuber.json.format.apiobj._ import skuber._ import skuber.api.WatchSource.Start +import skuber.api.patch._ import scala.concurrent.duration._ @@ -721,6 +722,31 @@ package object client { makeRequestReturningObjectResource[O](httpRequest) } + def patch[P <: Patch, O <: ObjectResource](name: String, patchData: P, namespace: Option[String] = None) + (implicit patchfmt: Writes[P], fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext = RequestLoggingContext()): Future[O] = { + val targetNamespace = namespace.getOrElse(namespaceName) + + val contentType = patchData.strategy match { + case StrategicMergePatchStrategy => + CustomMediaTypes.`application/strategic-merge-patch+json` + case JsonMergePatchStrategy => + CustomMediaTypes.`application/merge-patch+json` + case JsonPatchStrategy => + MediaTypes.`application/json-patch+json` + } + + logInfo(logConfig.logRequestBasicMetadata, s"Requesting patch of resource: { name:$name ... }") + logInfo(logConfig.logRequestFullObjectResource, s" Marshal and send: ${patchData.toString}") + + val marshal = Marshal(patchData) + for { + requestEntity <- marshal.to[RequestEntity] + httpRequest = buildRequest(HttpMethods.PATCH, rd, Some(name), namespace = targetNamespace) + .withEntity(requestEntity.withContentType(contentType)) + newOrUpdatedResource <- makeRequestReturningObjectResource[O](httpRequest) + } yield newOrUpdatedResource + } + // get API versions supported by the cluster def getServerAPIVersions(implicit lc: LoggingContext=RequestLoggingContext()): Future[List[String]] = { val url = clusterServer + "/api" diff --git a/client/src/main/scala/skuber/api/patch.scala b/client/src/main/scala/skuber/api/patch.scala new file mode 100644 index 00000000..3995b1e3 --- /dev/null +++ b/client/src/main/scala/skuber/api/patch.scala @@ -0,0 +1,77 @@ +package skuber.api + +import akka.http.scaladsl.model.{HttpCharsets, MediaType} +import play.api.libs.json.Writes + +package object patch { + + object CustomMediaTypes { + val `application/merge-patch+json` = MediaType.applicationWithFixedCharset("merge-patch+json", HttpCharsets.`UTF-8`) + val `application/strategic-merge-patch+json` = MediaType.applicationWithFixedCharset("strategic-merge-patch+json", HttpCharsets.`UTF-8`) + } + + object JsonPatchOperation { + sealed trait Operation { + def op: String + } + + trait ValueOperation[T] extends Operation { + type ValueType = T + def path: String + def value: ValueType + def fmt: Writes[T] + } + + trait UnaryOperation extends Operation { + def path: String + } + + trait DirectionalOperation extends Operation { + def from: String + def path: String + } + + case class Add[T](path: String, value: T)(implicit val fmt: Writes[T]) extends ValueOperation[T] { + val op = "add" + } + + case class Remove(path: String) extends UnaryOperation { + val op = "remove" + } + + case class Replace[T](path: String, value: T)(implicit val fmt: Writes[T]) extends ValueOperation[T] { + val op = "replace" + } + + case class Move(from: String, path: String) extends DirectionalOperation { + val op = "move" + } + + case class Copy(from: String, path: String) extends DirectionalOperation { + val op = "copy" + } + } + + sealed trait Patch { + val strategy: PatchStrategy + } + + sealed trait PatchStrategy + + sealed trait MergePatchStrategy extends PatchStrategy + + case object StrategicMergePatchStrategy extends MergePatchStrategy + + case object JsonMergePatchStrategy extends MergePatchStrategy + + case object JsonPatchStrategy extends PatchStrategy + + case class JsonPatchOperationList(operations: List[JsonPatchOperation.Operation]) extends Patch { + override val strategy = JsonPatchStrategy + } + + case class MetadataPatch(labels: Option[Map[String, String]] = Some(Map()), + annotations: Option[Map[String, String]] = Some(Map()), + override val strategy: MergePatchStrategy = StrategicMergePatchStrategy) extends Patch + +} diff --git a/client/src/main/scala/skuber/json/package.scala b/client/src/main/scala/skuber/json/package.scala index f55e5b68..ea1e0a2a 100644 --- a/client/src/main/scala/skuber/json/package.scala +++ b/client/src/main/scala/skuber/json/package.scala @@ -8,6 +8,7 @@ import org.apache.commons.codec.binary.Base64 import play.api.libs.functional.syntax._ import play.api.libs.json._ import skuber._ +import skuber.api.patch.{JsonPatchOperation, JsonPatchOperationList, MetadataPatch} /** * @author David O'Riordan @@ -1095,6 +1096,39 @@ package object format { (JsPath \ "object").format[T] )(WatchEvent.apply[T] _, unlift(WatchEvent.unapply[T])) - } + } + + implicit def jsonPatchOperationWrite = Writes[JsonPatchOperation.Operation] { value => + JsObject(Map("op" -> JsString(value.op)) ++ (value match { + case v: JsonPatchOperation.ValueOperation[_] => + Map( + "path" -> JsString(v.path), + "value" -> v.fmt.writes(v.value) + ) + case v: JsonPatchOperation.UnaryOperation => + Map( + "path" -> JsString(v.path) + ) + case v: JsonPatchOperation.DirectionalOperation => + Map( + "from" -> JsString(v.from), + "path" -> JsString(v.path) + ) + })) + } + + implicit def jsonPatchOperationListWrite = Writes[JsonPatchOperationList] { value => + JsArray(value.operations.map(jsonPatchOperationWrite.writes)) } + + implicit val metadataPatchWrite = Writes[MetadataPatch] { value => + val labels = value.labels.map { + m => JsObject(m.mapValues(JsString)) + }.getOrElse(JsNull) + val annotations = value.annotations.map { + m => JsObject(m.mapValues(JsString)) + }.getOrElse(JsNull) + val metadata = JsObject(Map("labels" -> labels, "annotations" -> annotations)) + JsObject(Map("metadata" -> metadata)) + } } diff --git a/client/src/main/scala/skuber/package.scala b/client/src/main/scala/skuber/package.scala index 3846ebe6..e267ed8d 100644 --- a/client/src/main/scala/skuber/package.scala +++ b/client/src/main/scala/skuber/package.scala @@ -2,9 +2,10 @@ import scala.language.implicitConversions import java.net.URL +import akka.http.scaladsl.model.{HttpCharsets, MediaType} import akka.stream.Materializer import com.typesafe.config.Config -import play.api.libs.json.Format +import play.api.libs.json._ import skuber.api.client.{RequestContext, Status} /* From 12755402f9c20396c4076288a0726df2f83f3494 Mon Sep 17 00:00:00 2001 From: Daisuke Taniwaki Date: Thu, 1 Nov 2018 18:02:59 +0900 Subject: [PATCH 19/21] Improve pod log (#236) * Refactor the invoke method for watch * Improve the pod log method * Add integration tests for the pod log method * Improve assertion message of regexp matching * Reuse connection context --- client/src/it/scala/skuber/K8SFixture.scala | 4 +- client/src/it/scala/skuber/PodLogSpec.scala | 74 +++++++++++++++++++ client/src/main/resources/reference.conf | 5 ++ client/src/main/scala/skuber/api/Watch.scala | 4 +- .../src/main/scala/skuber/api/package.scala | 52 +++++++------ 5 files changed, 114 insertions(+), 25 deletions(-) create mode 100644 client/src/it/scala/skuber/PodLogSpec.scala diff --git a/client/src/it/scala/skuber/K8SFixture.scala b/client/src/it/scala/skuber/K8SFixture.scala index 0d1daae5..f58ef40c 100644 --- a/client/src/it/scala/skuber/K8SFixture.scala +++ b/client/src/it/scala/skuber/K8SFixture.scala @@ -14,8 +14,10 @@ trait K8SFixture extends fixture.AsyncFlatSpec { implicit val materializer = ActorMaterializer() implicit val dispatcher = system.dispatcher + val config = ConfigFactory.load() + override def withFixture(test: OneArgAsyncTest): FutureOutcome = { - val k8s = k8sInit + val k8s = k8sInit(config) complete { withFixture(test.toNoArgAsyncTest(k8s)) } lastly { diff --git a/client/src/it/scala/skuber/PodLogSpec.scala b/client/src/it/scala/skuber/PodLogSpec.scala new file mode 100644 index 00000000..c0362abd --- /dev/null +++ b/client/src/it/scala/skuber/PodLogSpec.scala @@ -0,0 +1,74 @@ +package skuber + +import java.time.ZonedDateTime + +import akka.stream.scaladsl.TcpIdleTimeoutException +import com.typesafe.config.ConfigFactory +import org.scalatest.{BeforeAndAfterAll, Matchers} +import org.scalatest.concurrent.Eventually +import skuber.Pod.LogQueryParams +import skuber.json.format._ + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class PodLogSpec extends K8SFixture with Eventually with Matchers with BeforeAndAfterAll { + val podName: String = java.util.UUID.randomUUID().toString + + behavior of "PodLog" + + val idleTimeout = 3 seconds + override val config = ConfigFactory.parseString(s"skuber.pod-log.idle-timeout=${idleTimeout.toSeconds}s").withFallback(ConfigFactory.load()) + + override def beforeAll(): Unit = { + super.beforeAll() + + val k8s = k8sInit(config) + Await.result(k8s.create(getNginxPod(podName, "1.7.9")), 3 second) + // Let the pod running + Thread.sleep(3000) + k8s.close + } + + override def afterAll(): Unit = { + val k8s = k8sInit(config) + Await.result(k8s.delete[Pod](podName), 3 second) + Thread.sleep(3000) + k8s.close + + super.afterAll() + } + + it should "get log of a pod" in { k8s => + k8s.getPodLogSource(podName, LogQueryParams(follow = Some(false))).flatMap { source => + source.map(_.utf8String).runReduce(_ + _).map { s => + assert(s == "foo\n") + } + } + } + + it should "tail log of a pod and timeout after a while" in { k8s => + var log = "" + var start = ZonedDateTime.now() + k8s.getPodLogSource(podName, LogQueryParams(follow = Some(true))).flatMap { source => + source.map(_.utf8String).runForeach(log += _) + }.failed.map { case e: TcpIdleTimeoutException => + val msgPattern = s"TCP idle-timeout encountered on connection to [^,]+, no bytes passed in the last ${idleTimeout}" + assert(e.getMessage.matches(msgPattern), s"""["${e.getMessage}"] does not match ["${msgPattern}"]""") + assert(log == "foo\n") + assert(ZonedDateTime.now().isAfter(start.withSecond(idleTimeout.toSeconds.toInt))) + } + } + + def getNginxContainer(version: String): Container = Container( + name = "ubuntu", image = "nginx:" + version, + command = List("sh"), + args = List("-c", s"""echo "foo"; trap exit TERM; sleep infinity & wait""") + ) + + def getNginxPod(name: String, version: String): Pod = { + val container = getNginxContainer(version) + val podSpec = Pod.Spec(containers = List((container))) + Pod.named(podName).copy(spec = Some(podSpec)) + } +} diff --git a/client/src/main/resources/reference.conf b/client/src/main/resources/reference.conf index 51a256bc..2f30af5a 100644 --- a/client/src/main/resources/reference.conf +++ b/client/src/main/resources/reference.conf @@ -10,6 +10,11 @@ skuber { idle-timeout = null } + pod-log { + # The idle timeout duration for any connections used by skuber `pod log` requests - if null the timeout is infinite. + idle-timeout = null + } + watch-continuously { # Timeout that is passed to the kubernetes cluster for all list/watch calls. This limits the duration of the call, # regardless of any activity or inactivity. diff --git a/client/src/main/scala/skuber/api/Watch.scala b/client/src/main/scala/skuber/api/Watch.scala index 7db96513..732aa853 100644 --- a/client/src/main/scala/skuber/api/Watch.scala +++ b/client/src/main/scala/skuber/api/Watch.scala @@ -33,7 +33,7 @@ object Watch { val maybeResourceVersionQuery = sinceResourceVersion map { version => Uri.Query("resourceVersion" -> version) } val request = context.buildRequest(HttpMethods.GET, rd, Some(name), query = maybeResourceVersionQuery, watch = true) - val responseFut = context.invoke(request, watch = true) + val responseFut = context.invokeWatch(request) toFutureWatchEventSource(context, responseFut, bufSize) } @@ -54,7 +54,7 @@ object Watch { val maybeResourceVersionQuery = sinceResourceVersion map { v => Uri.Query("resourceVersion" -> v) } val request = context.buildRequest(HttpMethods.GET, rd, None, query = maybeResourceVersionQuery, watch = true) - val responseFut = context.invoke(request, watch = true) + val responseFut = context.invokeWatch(request) toFutureWatchEventSource(context, responseFut, bufSize) } diff --git a/client/src/main/scala/skuber/api/package.scala b/client/src/main/scala/skuber/api/package.scala index e00a8c67..3825bc1b 100644 --- a/client/src/main/scala/skuber/api/package.scala +++ b/client/src/main/scala/skuber/api/package.scala @@ -209,13 +209,14 @@ package object client { } class RequestContext(val requestMaker: (Uri, HttpMethod) => HttpRequest, - val requestInvoker: (HttpRequest, Boolean) => Future[HttpResponse], val clusterServer: String, val requestAuth: AuthInfo, val namespaceName: String, val watchContinuouslyRequestTimeout: Duration, val watchContinuouslyIdleTimeout: Duration, val watchPoolIdleTimeout: Duration, + val watchSettings: ConnectionPoolSettings, + val podLogSettings: ConnectionPoolSettings, val sslContext: Option[SSLContext], val logConfig: LoggingConfig, val closeHook: Option[() => Unit]) @@ -223,17 +224,28 @@ package object client { val log = Logging.getLogger(actorSystem, "skuber.api") + val connectionContext = sslContext + .map { ssl => + ConnectionContext.https(ssl, enabledProtocols = Some(scala.collection.immutable.Seq("TLSv1.2", "TLSv1"))) + } + .getOrElse(Http().defaultClientHttpsContext) + private val clusterServerUri = Uri(clusterServer) private var isClosed = false - private[skuber] def invoke(request: HttpRequest, watch: Boolean = false)(implicit lc: LoggingContext): Future[HttpResponse] = { + private[skuber] def invokeWatch(request: HttpRequest)(implicit lc: LoggingContext): Future[HttpResponse] = invoke(request, watchSettings) + + private[skuber] def invokeLog(request: HttpRequest)(implicit lc: LoggingContext): Future[HttpResponse] = invoke(request, podLogSettings) + + private[skuber] def invoke(request: HttpRequest, settings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem))(implicit lc: LoggingContext): Future[HttpResponse] = { if (isClosed) { logError("Attempt was made to invoke request on closed API request context") throw new IllegalStateException("Request context has been closed") } logInfo(logConfig.logRequestBasic, s"about to send HTTP request: ${request.method.value} ${request.uri.toString}") - val responseFut = requestInvoker(request, watch) + + val responseFut = Http().singleRequest(request, settings = settings, connectionContext = connectionContext) responseFut onComplete { case Success(response) => logInfo(logConfig.logResponseBasic,s"received response with HTTP status ${response.status.intValue()}") case Failure(ex) => logError("HTTP request resulted in an unexpected exception",ex) @@ -603,8 +615,14 @@ package object client { val nameComponent=s"${name}/log" val rd = implicitly[ResourceDefinition[Pod]] val request = buildRequest(HttpMethods.GET, rd, Some(nameComponent), query, false, targetNamespace) - invoke(request).map { response => - response.entity.dataBytes + invokeLog(request).flatMap { response => + val statusOptFut = checkResponseStatus(response) + statusOptFut map { + case Some(status) => + throw new K8SException(status) + case _ => + response.entity.dataBytes + } } } @@ -869,9 +887,9 @@ package object client { * and using same credentials and other configuration. */ def usingNamespace(newNamespace: String): RequestContext = - new RequestContext(requestMaker, requestInvoker, clusterServer, requestAuth, + new RequestContext(requestMaker, clusterServer, requestAuth, newNamespace, watchContinuouslyRequestTimeout, watchContinuouslyIdleTimeout, - watchPoolIdleTimeout, sslContext, logConfig, closeHook + watchPoolIdleTimeout, watchSettings, podLogSettings, sslContext, logConfig, closeHook ) private[skuber] def toKubernetesResponse[T](response: HttpResponse)(implicit reader: Reads[T], lc: LoggingContext): Future[T] = @@ -1003,6 +1021,7 @@ package object client { def durationFomConfig(configKey: String): Option[Duration] = Some(Duration.fromNanos(appConfig.getDuration(configKey).toNanos)) val watchIdleTimeout: Duration = getSkuberConfig("watch.idle-timeout", durationFomConfig, Duration.Inf) + val podLogIdleTimeout: Duration = getSkuberConfig("pod-log.idle-timeout", durationFomConfig, Duration.Inf) val watchContinuouslyRequestTimeout: Duration = getSkuberConfig("watch-continuously.request-timeout", durationFomConfig, 30.seconds) val watchContinuouslyIdleTimeout: Duration = getSkuberConfig("watch-continuously.idle-timeout", durationFomConfig, 60.seconds) @@ -1018,12 +1037,6 @@ package object client { val sslContext = TLS.establishSSLContext(k8sContext) - val connectionContext = sslContext - .map { ssl => - ConnectionContext.https(ssl, enabledProtocols = Some(scala.collection.immutable.Seq("TLSv1.2", "TLSv1"))) - } - .getOrElse(Http().defaultClientHttpsContext) - val theNamespaceName = k8sContext.namespace.name match { case "" => "default" case name => name @@ -1034,18 +1047,13 @@ package object client { val defaultClientSettings = ConnectionPoolSettings(actorSystem.settings.config) val watchConnectionSettings = defaultClientSettings.connectionSettings.withIdleTimeout(watchIdleTimeout) val watchSettings = defaultClientSettings.withConnectionSettings(watchConnectionSettings) - - val requestInvoker = (request: HttpRequest, watch: Boolean) => { - if (!watch) - Http().singleRequest(request, connectionContext = connectionContext) - else - Http().singleRequest(request, settings = watchSettings, connectionContext = connectionContext) - } + val podLogConnectionSettings = defaultClientSettings.connectionSettings.withIdleTimeout(podLogIdleTimeout) + val podLogSettings = defaultClientSettings.withConnectionSettings(podLogConnectionSettings) new RequestContext( - requestMaker, requestInvoker, k8sContext.cluster.server, k8sContext.authInfo, + requestMaker, k8sContext.cluster.server, k8sContext.authInfo, theNamespaceName, watchContinuouslyRequestTimeout, watchContinuouslyIdleTimeout, - watchPoolIdleTimeout, sslContext, logConfig, closeHook + watchPoolIdleTimeout, watchSettings, podLogSettings, sslContext, logConfig, closeHook ) } From e221715f97a9ec0d7698d2270eb4e2676bbdf95f Mon Sep 17 00:00:00 2001 From: Daisuke Taniwaki Date: Sat, 10 Nov 2018 16:28:31 +0900 Subject: [PATCH 20/21] Make Patch trait not sealed (#240) --- client/src/main/scala/skuber/api/patch.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/scala/skuber/api/patch.scala b/client/src/main/scala/skuber/api/patch.scala index 3995b1e3..eec9a28b 100644 --- a/client/src/main/scala/skuber/api/patch.scala +++ b/client/src/main/scala/skuber/api/patch.scala @@ -52,7 +52,7 @@ package object patch { } } - sealed trait Patch { + trait Patch { val strategy: PatchStrategy } From 69b9068a316caf8e4793828724560c82595f4692 Mon Sep 17 00:00:00 2001 From: David O'Riordan Date: Sat, 10 Nov 2018 16:15:40 +0000 Subject: [PATCH 21/21] Release v2.0.12 --- README.md | 2 +- build.sbt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 1b8340b6..97c6bc4d 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ Provides you with a configured client on startup. It is handy to use this for qu > Just handy shortcut to import skuber inside ammonite-repl: ```scala - import $ivy.`io.skuber::skuber:2.0.11`, skuber._, skuber.json.format._ + import $ivy.`io.skuber::skuber:2.0.12`, skuber._, skuber.json.format._ ``` ### Interactive with sbt diff --git a/build.sbt b/build.sbt index ea095c0a..df395fcf 100644 --- a/build.sbt +++ b/build.sbt @@ -29,7 +29,7 @@ scalacOptions += "-target:jvm-1.8" scalacOptions in Test ++= Seq("-Yrangepos") -version in ThisBuild := "2.0.11" +version in ThisBuild := "2.0.12" sonatypeProfileName := "io.skuber"