diff --git a/client/src/it/scala/skuber/DynamicKubernetesClientImplTest.scala b/client/src/it/scala/skuber/DynamicKubernetesClientImplTest.scala new file mode 100644 index 00000000..78afca22 --- /dev/null +++ b/client/src/it/scala/skuber/DynamicKubernetesClientImplTest.scala @@ -0,0 +1,196 @@ +package skuber + +import java.util.UUID.randomUUID +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.{Eventually, ScalaFutures} +import org.scalatest.matchers.should.Matchers +import play.api.libs.json.Json +import skuber.FutureUtil.FutureOps +import skuber.api.dynamic.client.impl.{DynamicKubernetesClientImpl, DynamicKubernetesObject, JsonRaw} +import skuber.apps.v1.Deployment +import scala.concurrent.Future +import scala.concurrent.duration._ + +class DynamicKubernetesClientImplTest extends K8SFixture with Eventually with Matchers with BeforeAndAfterAll with ScalaFutures { + + val deploymentName1: String = randomUUID().toString + val deploymentName2: String = randomUUID().toString + val deploymentName3: String = randomUUID().toString + val deploymentName4: String = randomUUID().toString + val deploymentName5: String = randomUUID().toString + + private val kubernetesDynamicClient = DynamicKubernetesClientImpl.build() + + override def afterAll(): Unit = { + val k8s = k8sInit(config) + + val results = Future.sequence(List(deploymentName1, deploymentName2, deploymentName3, deploymentName4, deploymentName5).map { name => + k8s.delete[Deployment](name).withTimeout().recover { case _ => () } + }).withTimeout().recover{ case _ => () } + + results.valueT + + results.onComplete { _ => + k8s.close + system.terminate().recover { case _ => () }.valueT + } + } + + behavior of "DynamicKubernetesClientImplTest" + + + it should "create a deployment" in { _ => + + val createdDeployment = Json.parse { + s""" + { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": { + "name": "$deploymentName1" + }, + "spec": { + "replicas": 1, + "selector": { + "matchLabels": { + "app": "nginx" + } + }, + "template": { + "metadata": { + "name": "nginx", + "labels": { + "app": "nginx" + } + }, + "spec": { + "containers": [ + { + "name": "nginx", + "image": "nginx:1.7.9", + "ports": [ + { + "containerPort": 80, + "protocol": "TCP" + } + ] + } + ] + } + } + } + }""".stripMargin + } + + + val createdDeploymentResponse = kubernetesDynamicClient.create(JsonRaw(createdDeployment), resourcePlural = "deployments").valueT + + assert(createdDeploymentResponse.metadata.map(_.name).contains(deploymentName1)) + + val getDeployment: DynamicKubernetesObject = kubernetesDynamicClient.get( + deploymentName1, + apiVersion = "apps/v1", + resourcePlural = "deployments").valueT + + assert(getDeployment.metadata.map(_.name).contains(deploymentName1)) + } + + + it should "update a deployment replicas" in { k8s => + + //create a deployment + k8s.create(getNginxDeployment(deploymentName2)).valueT + + val updated = Json.parse { + s""" + { + "kind": "Deployment", + "apiVersion": "apps/v1", + "metadata": { + "name": "$deploymentName2" + }, + "spec": { + "replicas": 3, + "selector": { + "matchLabels": { + "app": "nginx" + } + }, + "template": { + "metadata": { + "name": "nginx", + "labels": { + "app": "nginx" + } + }, + "spec": { + "containers": [ + { + "name": "nginx", + "image": "nginx:1.7.9", + "ports": [ + { + "containerPort": 80, + "protocol": "TCP" + } + ] + } + ], + "restartPolicy": "Always", + "dnsPolicy": "ClusterFirst" + } + } + } + }""".stripMargin + } + + + val updatedDeploymentResponse = kubernetesDynamicClient.update(JsonRaw(updated), resourcePlural = "deployments").valueT + + assert(updatedDeploymentResponse.metadata.map(_.name).contains(deploymentName2)) + + val getDeployment: DynamicKubernetesObject = kubernetesDynamicClient.get( + deploymentName2, + apiVersion = "apps/v1", + resourcePlural = "deployments").valueT + + assert((getDeployment.jsonRaw.jsValue \ "spec" \ "replicas").asOpt[Int].contains(3)) + } + + it should "delete a deployment" in { k8s => + + //create a deployment + k8s.create(getNginxDeployment(deploymentName3)).valueT + + // delete a deployment + kubernetesDynamicClient.delete(deploymentName3, apiVersion = "apps/v1", resourcePlural = "deployments").valueT + + Thread.sleep(5000) + + eventually(timeout(30.seconds), interval(3.seconds)) { + whenReady(kubernetesDynamicClient.get(deploymentName3, apiVersion = "apps/v1", resourcePlural = "deployments").withTimeout().failed) { result => + result shouldBe a[K8SException] + result match { + case ex: K8SException => ex.status.code shouldBe Some(404) + case _ => assert(false) + } + } + } + } + + it should "list deployments" in { k8s => + val labels = Map("listDynamic" -> "true") + //create a deployment + k8s.create(getNginxDeployment(deploymentName4, labels = labels)).valueT + k8s.create(getNginxDeployment(deploymentName5, labels = labels)).valueT + + val listOptions = ListOptions(labelSelector = Some(LabelSelector(LabelSelector.IsEqualRequirement("listDynamic", "true")))) + // list deployments + val deployments = kubernetesDynamicClient.list(apiVersion = "apps/v1", resourcePlural = "deployments", options = Some(listOptions)).valueT + + assert(deployments.resources.size == 2) + assert(deployments.resources.flatMap(_.metadata.map(_.name)).toSet === Set(deploymentName4, deploymentName5)) + } + + +} diff --git a/client/src/it/scala/skuber/K8SFixture.scala b/client/src/it/scala/skuber/K8SFixture.scala index 6dd69f8a..6fecefe9 100644 --- a/client/src/it/scala/skuber/K8SFixture.scala +++ b/client/src/it/scala/skuber/K8SFixture.scala @@ -30,16 +30,21 @@ trait K8SFixture extends FixtureAnyFlatSpec { } def createNamespace(name: String, k8s: FixtureParam): Namespace = k8s.create[Namespace](Namespace.forName(name)).valueT + def deleteNamespace(name: String, k8s: FixtureParam): Unit = k8s.delete[Namespace](name).withTimeout().recover { case _ => () } def getNginxContainer(version: String): Container = Container(name = "nginx", image = "nginx:" + version).exposePort(80) - def getNginxDeployment(name: String, version: String): Deployment = { + def getNginxDeployment(name: String, version: String = "1.7.9", labels: Map[String, String] = Map.empty): Deployment = { import LabelSelector.dsl._ val nginxContainer = getNginxContainer(version) val nginxTemplate = Pod.Template.Spec.named("nginx").addContainer(nginxContainer).addLabel("app" -> "nginx") val labelSelector = LabelSelector(IsEqualRequirement("app", "nginx")) - Deployment(name).withTemplate(nginxTemplate).withLabelSelector(labelSelector) + + Deployment(name) + .copy(metadata = ObjectMeta(name = name, labels = labels)) + .withTemplate(nginxTemplate) + .withLabelSelector(labelSelector) } diff --git a/client/src/main/scala/skuber/api/client/Cluster.scala b/client/src/main/scala/skuber/api/client/Cluster.scala index 377db60d..a8da893b 100644 --- a/client/src/main/scala/skuber/api/client/Cluster.scala +++ b/client/src/main/scala/skuber/api/client/Cluster.scala @@ -3,16 +3,16 @@ package skuber.api.client import com.amazonaws.regions.Regions /** - * @author David O'Riordan - * - * Defines the details needed to communicate with the API server for a Kubernetes cluster - */ + * @author David O'Riordan + * + * Defines the details needed to communicate with the API server for a Kubernetes cluster + */ case class Cluster(apiVersion: String = "v1", - server: String = defaultApiServerURL, - insecureSkipTLSVerify: Boolean = false, - certificateAuthority: Option[PathOrData] = None, - clusterName: Option[String] = None, - awsRegion: Option[Regions] = None) { + server: String = defaultApiServerURL, + insecureSkipTLSVerify: Boolean = false, + certificateAuthority: Option[PathOrData] = None, + clusterName: Option[String] = None, + awsRegion: Option[Regions] = None) { def withName(name: String): Cluster = this.copy(clusterName = Some(name)) def withAwsRegion(region: Regions): Cluster = this.copy(awsRegion = Some(region)) diff --git a/client/src/main/scala/skuber/api/client/Context.scala b/client/src/main/scala/skuber/api/client/Context.scala index 4782c0da..9cd03c94 100644 --- a/client/src/main/scala/skuber/api/client/Context.scala +++ b/client/src/main/scala/skuber/api/client/Context.scala @@ -4,9 +4,9 @@ import skuber.Namespace /** * @author David O'Riordan - * Define the Kubernetes API context for requests + * Define the Kubernetes API context for requests */ case class Context(cluster: Cluster = Cluster(), - authInfo: AuthInfo = NoAuth, - namespace: Namespace = Namespace.default -) + authInfo: AuthInfo = NoAuth, + namespace: Namespace = Namespace.default + ) diff --git a/client/src/main/scala/skuber/api/dynamic/client/impl/DynamicKubernetesClientImpl.scala b/client/src/main/scala/skuber/api/dynamic/client/impl/DynamicKubernetesClientImpl.scala new file mode 100644 index 00000000..0e51f780 --- /dev/null +++ b/client/src/main/scala/skuber/api/dynamic/client/impl/DynamicKubernetesClientImpl.scala @@ -0,0 +1,359 @@ +package skuber.api.dynamic.client.impl + +import akka.actor.ActorSystem +import akka.event.Logging +import akka.http.scaladsl.marshalling.{Marshal, Marshaller, ToEntityMarshaller} +import akka.http.scaladsl.model._ +import akka.http.scaladsl.settings.ConnectionPoolSettings +import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} +import akka.http.scaladsl.{ConnectionContext, Http, HttpsConnectionContext} +import play.api.libs.json.{JsString, JsValue} +import skuber.{DeleteOptions, ListOptions} +import skuber.api.client._ +import skuber.api.security.{HTTPRequestAuth, TLS} +import skuber.json.PlayJsonSupportForAkkaHttp._ +import skuber.json.format.apiobj.statusReads +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} +import skuber.json.format.deleteOptionsFmt +import DynamicKubernetesClientImpl.jsValueToRequestEntityMarshaller +import akka.util.ByteString +/** + * This is non-typed kubernetes client, for typed client see [[skuber.api.client.impl.KubernetesClientImpl]] + * This class provides a dynamic client for the Kubernetes API server. + * It is intended to be used for accessing resources / classes that are not part of the skuber library. + * + * It uses the Akka HTTP client to handle the requests to + * the Kubernetes API server. + */ +class DynamicKubernetesClientImpl(context: Context = Context(), + logConfig: LoggingConfig, + closeHook: Option[() => Unit], + poolSettings: ConnectionPoolSettings)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext) { + private val namespaceName = "default" + private val log = Logging.getLogger(actorSystem, "skuber.api") + private val requestAuth = context.authInfo + private val sslContext = TLS.establishSSLContext(context) + private val connectionContext: HttpsConnectionContext = sslContext + .map { ssl => + ConnectionContext.httpsClient { (host, port) => + val engine = ssl.createSSLEngine(host, port) + engine.setUseClientMode(true) + engine.setEnabledProtocols(Array("TLSv1.2", "TLSv1")) + engine + } + }.getOrElse(Http().defaultClientHttpsContext) + + private val clusterServer = context.cluster.server + + /** + * Get a resource from the Kubernetes API server + * + * @param name is the name of the resource to retrieve + * @param apiVersion is the api version of the resource type to retrieve, e.g: "apps/v1" + * @param resourcePlural is the plural name of the resource type to retrieve: e.g: "pods", "deployments" + * @param namespace is the namespace of the resource to retrieve + * */ + def get(name: String, + apiVersion: String, + resourcePlural: String, + namespace: Option[String] = None)(implicit lc: LoggingContext): Future[DynamicKubernetesObject] = { + _get(name, namespace, apiVersion, resourcePlural) + } + + /** + * Get a resource from the Kubernetes API server + * + * @param name is the name of the resource to retrieve + * @param namespace is the namespace of the resource to retrieve + * @param apiVersion is the api version of the resource type to retrieve, e.g: "apps/v1" + * @param resourcePlural is the plural name of the resource type to retrieve: e.g: "pods", "deployments" + * */ + def getOption(name: String, + namespace: Option[String] = None, + apiVersion: String, + resourcePlural: String)(implicit lc: LoggingContext): Future[Option[DynamicKubernetesObject]] = { + _get(name, namespace, apiVersion, resourcePlural) map { result => + Some(result) + } recover { + case ex: K8SException if ex.status.code.contains(StatusCodes.NotFound.intValue) => None + } + } + + /** + * Create a kubernetes resource + * + * @param rawInput is the raw json input of the object to create + * @param namespace is the namespace of the resource + * @param resourcePlural is the plural name of the resource type: e.g: "pods", "deployments" + * */ + def create(rawInput: JsonRaw, namespace: Option[String] = None, resourcePlural: String): Future[DynamicKubernetesObject] = { + modify( + method = HttpMethods.POST, + rawInput = rawInput, + namespace = namespace, + resourcePlural = resourcePlural + ) + } + + /** + * Update a resource from the Kubernetes API server + * + * @param rawInput is the raw json input of the object to create + * @param namespace is the namespace of the resource + * @param resourcePlural is the plural name of the resource type: e.g: "pods", "deployments" + * */ + def update(rawInput: JsonRaw, namespace: Option[String] = None, resourcePlural: String): Future[DynamicKubernetesObject] = { + modify( + method = HttpMethods.PUT, + rawInput = rawInput, + namespace = namespace, + resourcePlural = resourcePlural + ) + } + + /** + * List objects of specific resource kind in current namespace + * + * @param apiVersion is the api version of the resource type to retrieve, e.g: "apps/v1" + * @param resourcePlural is the plural name of the resource type to retrieve: e.g: "pods", "deployments" + * @param namespace is the namespace of the resource + * @param options see [[ListOptions]] + */ + def list(apiVersion: String, + resourcePlural: String, + namespace: Option[String] = None, + options: Option[ListOptions] = None): Future[DynamicKubernetesObjectList] = { + val queryOpt = options map { opts => + Uri.Query(opts.asMap) + } + val req = buildRequest(method = HttpMethods.GET, + apiVersion = apiVersion, + resourcePlural = resourcePlural, + query = queryOpt, + nameComponent = None, + namespace = namespace) + makeRequestReturningObjectResource[DynamicKubernetesObjectList](req) + } + + /** + * Delete a resource from the Kubernetes API server + * + * @param name resource name + * @param namespace is the namespace of the resource + * @param apiVersion is the api version of the resource type to retrieve, e.g: "apps/v1" + * @param resourcePlural is the plural name of the resource type to retrieve: e.g: "pods", "deployments" + * */ + def delete(name: String, namespace: Option[String] = None, apiVersion: String, resourcePlural: String): Future[Unit] = { + val options = DeleteOptions() + deleteWithOptions(name, options, namespace = namespace, apiVersion = apiVersion, resourcePlural = resourcePlural) + } + + /** + * Delete a resource from the Kubernetes API server + * + * @param name resource name + * @param options delete options see [[DeleteOptions]] + * @param namespace is the namespace of the resource + * @param apiVersion is the api version of the resource type to retrieve, e.g: "apps/v1" + * @param resourcePlural is the plural name of the resource type to retrieve: e.g: "pods", "deployments" + * */ + def deleteWithOptions(name: String, options: DeleteOptions, apiVersion: String, resourcePlural: String, namespace: Option[String] = None): Future[Unit] = { + val marshalledOptions = Marshal(options) + for { + requestEntity <- marshalledOptions.to[RequestEntity] + request = buildRequest(method = HttpMethods.DELETE, apiVersion = apiVersion, resourcePlural = resourcePlural, nameComponent = Some(name), namespace = namespace) + .withEntity(requestEntity.withContentType(MediaTypes.`application/json`)) + response <- invoke(request) + responseStatusOpt <- checkResponseStatus(response) + _ <- ignoreResponseBody(response, responseStatusOpt) + } yield () + } + + // get API versions supported by the cluster + def getServerAPIVersions(implicit lc: LoggingContext): Future[List[String]] = { + val url = clusterServer + "/api" + val noAuthReq: HttpRequest = HttpRequest(method = HttpMethods.GET, uri = Uri(url)) + val request = HTTPRequestAuth.addAuth(noAuthReq, requestAuth) + for { + response <- invoke(request) + apiVersionResource <- toKubernetesResponse[DynamicKubernetesObject](response) + } yield apiVersionResource.jsonRaw.jsValue.as[List[String]] + } + + + private def modify(method: HttpMethod, + rawInput: JsonRaw, + resourcePlural: String, + namespace: Option[String])(implicit lc: LoggingContext, um: Unmarshaller[HttpResponse, DynamicKubernetesObject]): Future[DynamicKubernetesObject] = { + // if this is a POST we don't include the resource name in the URL + val nameComponent: Option[String] = method match { + case HttpMethods.POST => None + case _ => (rawInput.jsValue \ "metadata" \ "name").asOpt[String] + } + val apiVersion = (rawInput.jsValue \ "apiVersion").asOpt[String].getOrElse(throw new Exception(s"apiVersion not specified in raw input: $rawInput")) + + val marshal = Marshal(rawInput.jsValue) + for { + requestEntity <- marshal.to[RequestEntity] + httpRequest = buildRequest(method, apiVersion, resourcePlural, nameComponent, namespace = namespace) + .withEntity(requestEntity.withContentType(MediaTypes.`application/json`)) + newOrUpdatedResource <- makeRequestReturningObjectResource[DynamicKubernetesObject](httpRequest) + } yield newOrUpdatedResource + } + + private[skuber] def invoke(request: HttpRequest)(implicit lc: LoggingContext): Future[HttpResponse] = { + logInfo(logConfig.logRequestBasic, s"about to send HTTP request: ${request.method.value} ${request.uri.toString}") + val responseFut = Http().singleRequest(request, connectionContext = connectionContext) + responseFut onComplete { + case Success(response) => logInfo(logConfig.logResponseBasic, s"received response with HTTP status ${response.status.intValue()}") + case Failure(ex) => logError("HTTP request resulted in an unexpected exception", ex) + } + responseFut + } + + + private[skuber] def buildRequest(method: HttpMethod, + apiVersion: String, + resourcePlural: String, + nameComponent: Option[String], + query: Option[Uri.Query] = None, + namespace: Option[String]): HttpRequest = { + val nsPathComponent: Option[String] = + namespace match { + case Some(ns) => Some(s"namespaces/$ns") + case None => Some(s"namespaces/$namespaceName") + } + + val k8sUrlOptionalParts = List(clusterServer, + "apis", + apiVersion, + nsPathComponent, + resourcePlural, + nameComponent) + + val k8sUrlParts = k8sUrlOptionalParts collect { + case p: String if p != "" => p + case Some(p: String) if p != "" => p + } + + val k8sUrlStr = k8sUrlParts.mkString("/") + + val uri = query.map { q => + Uri(k8sUrlStr).withQuery(q) + }.getOrElse { + Uri(k8sUrlStr) + } + + val req: HttpRequest = HttpRequest(method = method, uri = uri) + HTTPRequestAuth.addAuth(req, requestAuth) + } + + private[skuber] def logInfo(enabledLogEvent: Boolean, msg: => String)(implicit lc: LoggingContext): Unit = { + if (log.isInfoEnabled && enabledLogEvent) { + log.info(s"[ ${lc.output} - ${msg}]") + } + } + + private[skuber] def logError(msg: String, ex: Throwable)(implicit lc: LoggingContext): Unit = { + log.error(ex, s"[ ${lc.output} - $msg ]") + } + + private[skuber] def makeRequestReturningObjectResource[T](httpRequest: HttpRequest)(implicit lc: LoggingContext, um: Unmarshaller[HttpResponse, T]): Future[T] = { + for { + httpResponse <- invoke(httpRequest) + result <- toKubernetesResponse[T](httpResponse) + } yield result + } + + + private[skuber] def toKubernetesResponse[T](response: HttpResponse)(implicit lc: LoggingContext, um: Unmarshaller[HttpResponse, T]): Future[T] = { + val statusOptFut = checkResponseStatus(response) + statusOptFut flatMap { + case Some(status) => + throw new K8SException(status) + case None => + try { + Unmarshal(response).to[T] + } + catch { + case ex: Exception => + logError("Unable to unmarshal resource from response", ex) + throw new K8SException(Status(message = Some("Error unmarshalling resource from response"), details = Some(JsString(ex.getMessage)))) + } + } + } + + private[api] def _get(name: String, + namespace: Option[String], + apiVersion: String, + resourcePlural: String)(implicit lc: LoggingContext): Future[DynamicKubernetesObject] = { + val req = buildRequest(HttpMethods.GET, apiVersion, resourcePlural, Some(name), namespace = namespace) + makeRequestReturningObjectResource[DynamicKubernetesObject](req) + } + + + // check for non-OK status, returning (in a Future) some Status object if not ok or otherwise None + private[skuber] def checkResponseStatus(response: HttpResponse): Future[Option[Status]] = { + response.status.intValue match { + case code if code < 300 => + Future.successful(None) + case code => + // a non-success or unexpected status returned - we should normally have a Status in the response body + val statusFut: Future[Status] = Unmarshal(response).to[Status] + statusFut map { status => + if (log.isInfoEnabled) + log.info(s"[Response: non-ok status returned - $status") + Some(status) + } recover { case ex => + if (log.isErrorEnabled) + log.error(s"[Response: could not read Status for non-ok response, exception : ${ex.getMessage}]") + val status: Status = Status(code = Some(response.status.intValue), + message = Some("Non-ok response and unable to parse Status from response body to get further details"), + details = Some(JsString(ex.getMessage))) + Some(status) + } + } + } + + /** + * Discards the response + * This is for requests (e.g. delete) for which we normally have no interest in the response body, but Akka Http + * requires us to drain it anyway + * (see https://doc.akka.io/docs/akka-http/current/scala/http/implications-of-streaming-http-entity.html) + * + * @param response the Http Response that we need to drain + * @return A Future[Unit] that will be set to Success or Failure depending on outcome of draining + */ + private def ignoreResponseBody(response: HttpResponse, responseStatusOpt: Option[Status]): Future[Unit] = { + responseStatusOpt match { + case Some(status) => + throw new K8SException(status) + case _ => + response.discardEntityBytes().future.map(_ => ()) + } + } +} + +object DynamicKubernetesClientImpl { + + def build(k8sContext: Option[Context] = None, + logConfig: Option[LoggingConfig] = None, + closeHook: Option[() => Unit] = None, + connectionPoolSettings: Option[ConnectionPoolSettings] = None) + (implicit actorSystem: ActorSystem, + executionContext: ExecutionContext): DynamicKubernetesClientImpl = { + val logConfFinal = logConfig.getOrElse(LoggingConfig()) + val connectionPoolSettingsFinal = connectionPoolSettings.getOrElse(ConnectionPoolSettings(actorSystem)) + val k8sContextFinal = k8sContext.getOrElse(defaultK8sConfig.currentContext) + new DynamicKubernetesClientImpl(k8sContextFinal, logConfFinal, closeHook, connectionPoolSettingsFinal) + } + + + implicit val jsValueToRequestEntityMarshaller: ToEntityMarshaller[JsValue] = + Marshaller.withFixedContentType(MediaTypes.`application/json`) { jsValue => + val jsonString = jsValue.toString() + HttpEntity.Strict(MediaTypes.`application/json`, ByteString(jsonString)) + } + +} diff --git a/client/src/main/scala/skuber/api/dynamic/client/impl/DynamicKubernetesObject.scala b/client/src/main/scala/skuber/api/dynamic/client/impl/DynamicKubernetesObject.scala new file mode 100644 index 00000000..646e6fc3 --- /dev/null +++ b/client/src/main/scala/skuber/api/dynamic/client/impl/DynamicKubernetesObject.scala @@ -0,0 +1,27 @@ + +package skuber.api.dynamic.client.impl + +import play.api.libs.json._ +import skuber.ObjectMeta +import skuber.json.format._ + +// Dynamic kubernetes object with a raw json response from kubernetes api. +case class DynamicKubernetesObject(jsonRaw: JsonRaw, + apiVersion: Option[String], + kind: Option[String], + metadata: Option[ObjectMeta]) + +object DynamicKubernetesObject { + + implicit val dynamicKubernetesObjectFmt: Format[DynamicKubernetesObject] = new Format[DynamicKubernetesObject] { + override def writes(o: DynamicKubernetesObject): JsValue = Json.writes[DynamicKubernetesObject].writes(o) + + override def reads(json: JsValue): JsResult[DynamicKubernetesObject] = { + val apiVersion = (json \ "apiVersion").asOpt[String] + val kind = (json \ "kind").asOpt[String] + val metadata = (json \ "metadata").asOpt[ObjectMeta] + JsSuccess(DynamicKubernetesObject(JsonRaw(json), apiVersion, kind, metadata)) + } + + } +} diff --git a/client/src/main/scala/skuber/api/dynamic/client/impl/DynamicKubernetesObjectList.scala b/client/src/main/scala/skuber/api/dynamic/client/impl/DynamicKubernetesObjectList.scala new file mode 100644 index 00000000..3fbc75f6 --- /dev/null +++ b/client/src/main/scala/skuber/api/dynamic/client/impl/DynamicKubernetesObjectList.scala @@ -0,0 +1,25 @@ + +package skuber.api.dynamic.client.impl + +import play.api.libs.json._ +import skuber.ObjectMeta +import skuber.json.format._ + +// Dynamic kubernetes object list with a raw json response from kubernetes api. +case class DynamicKubernetesObjectList(jsonRaw: JsonRaw, + resources: List[DynamicKubernetesObject]) + +object DynamicKubernetesObjectList { + + implicit val dynamicKubernetesObjectListFmt: Format[DynamicKubernetesObjectList] = new Format[DynamicKubernetesObjectList] { + override def writes(o: DynamicKubernetesObjectList): JsValue = Json.writes[DynamicKubernetesObjectList].writes(o) + + override def reads(json: JsValue): JsResult[DynamicKubernetesObjectList] = { + (json \ "items").asOpt[List[DynamicKubernetesObject]] match { + case Some(items) => JsSuccess(DynamicKubernetesObjectList(JsonRaw(json), items)) + case None => JsError("items field is missing") + } + } + + } +} diff --git a/client/src/main/scala/skuber/api/dynamic/client/impl/JsonRaw.scala b/client/src/main/scala/skuber/api/dynamic/client/impl/JsonRaw.scala new file mode 100644 index 00000000..61e22905 --- /dev/null +++ b/client/src/main/scala/skuber/api/dynamic/client/impl/JsonRaw.scala @@ -0,0 +1,10 @@ +package skuber.api.dynamic.client.impl + +import play.api.libs.json.{JsValue, Json, OFormat} + +case class JsonRaw(jsValue: JsValue) { + override def toString: String = Json.stringify(jsValue) +} +object JsonRaw { + implicit val jsonRawFmt: OFormat[JsonRaw] = Json.format[JsonRaw] +} diff --git a/docs/README.md b/docs/README.md index c105385d..401ef4e9 100644 --- a/docs/README.md +++ b/docs/README.md @@ -543,6 +543,96 @@ Supports `NetworkPolicy` resources (for Kubernetes v1.7 and above) - see Kuberne [Custom Resources](https://kubernetes.io/docs/concepts/api-extension/custom-resources/) are a powerful feature which enable Kubernetes clients to define and use their own custom resources to be treated in the same way as built-in kinds. They are useful for building Kubernetes operators and other advanced use cases. See the `CustomResourceSpec.scala` integration test which demonstrates how to use them in skuber. +### Dynamic Kubernetes Client +Dynamic Kubernetes Client is a client that can be used to interact with Kubernetes resources without having to define the resource types in the client. + +It is useful for interacting with resources that are not yet supported by skuber or for interacting with resources that are not known at compile time. + +Code example for using Dynamic Kubernetes Client `DynamicKubernetesClientImpl` + +```scala +import java.util.UUID.randomUUID +import akka.actor.ActorSystem +import play.api.libs.json.Json +import skuber.api.dynamic.client.impl.{DynamicKubernetesClientImpl, JsonRaw} +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContextExecutor} + +object DynamicKubernetesClientImplExample extends App { + + implicit val system: ActorSystem = ActorSystem() + implicit val dispatcher: ExecutionContextExecutor = system.dispatcher + + private val deploymentName1: String = randomUUID().toString + + private val kubernetesDynamicClient = DynamicKubernetesClientImpl.build() + + private val createDeploymentInput = Json.parse { + s""" + { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": { + "name": "$deploymentName1" + }, + "spec": { + "replicas": 1, + "selector": { + "matchLabels": { + "app": "nginx" + } + }, + "template": { + "metadata": { + "name": "nginx", + "labels": { + "app": "nginx" + } + }, + "spec": { + "containers": [ + { + "name": "nginx", + "image": "nginx:1.7.9", + "ports": [ + { + "containerPort": 80, + "protocol": "TCP" + } + ] + } + ] + } + } + } + }""".stripMargin + } + + + private val createdDeployment = kubernetesDynamicClient.create(JsonRaw(createDeploymentInput), resourcePlural = "deployments") + + val nameF = createdDeployment.flatMap { _ => + val getDeployment = kubernetesDynamicClient.get( + deploymentName1, + apiVersion = "apps/v1", + resourcePlural = "deployments") + getDeployment.map(_.metadata.map(_.name)) + } + + Await.result(nameF, 30.seconds) + + nameF.foreach { name => + println(s"Deployment name: $name") + } + + kubernetesDynamicClient.delete(deploymentName1, apiVersion = "apps/v1", resourcePlural = "deployments") + + Await.result(system.terminate(), 10.seconds) + +} + +``` + ### Custom resource Code example for adding a resource that not exist in skuber. diff --git a/examples/src/main/scala/skuber/examples/dynamicClient/DynamicKubernetesClientImplExample.scala b/examples/src/main/scala/skuber/examples/dynamicClient/DynamicKubernetesClientImplExample.scala new file mode 100644 index 00000000..f970ced2 --- /dev/null +++ b/examples/src/main/scala/skuber/examples/dynamicClient/DynamicKubernetesClientImplExample.scala @@ -0,0 +1,81 @@ +package skuber.examples.dynamicClient + +import java.util.UUID.randomUUID +import akka.actor.ActorSystem +import play.api.libs.json.Json +import skuber.api.dynamic.client.impl.{DynamicKubernetesClientImpl, JsonRaw} +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContextExecutor} + +object DynamicKubernetesClientImplExample extends App { + + implicit val system: ActorSystem = ActorSystem() + implicit val dispatcher: ExecutionContextExecutor = system.dispatcher + + private val deploymentName1: String = randomUUID().toString + + private val kubernetesDynamicClient = DynamicKubernetesClientImpl.build() + + private val createDeploymentInput = Json.parse { + s""" + { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": { + "name": "$deploymentName1" + }, + "spec": { + "replicas": 1, + "selector": { + "matchLabels": { + "app": "nginx" + } + }, + "template": { + "metadata": { + "name": "nginx", + "labels": { + "app": "nginx" + } + }, + "spec": { + "containers": [ + { + "name": "nginx", + "image": "nginx:1.7.9", + "ports": [ + { + "containerPort": 80, + "protocol": "TCP" + } + ] + } + ] + } + } + } + }""".stripMargin + } + + + private val createdDeployment = kubernetesDynamicClient.create(JsonRaw(createDeploymentInput), resourcePlural = "deployments") + + val nameF = createdDeployment.flatMap { _ => + val getDeployment = kubernetesDynamicClient.get( + deploymentName1, + apiVersion = "apps/v1", + resourcePlural = "deployments") + getDeployment.map(_.metadata.map(_.name)) + } + + Await.result(nameF, 30.seconds) + + nameF.foreach { name => + println(s"Deployment name: $name") + } + + kubernetesDynamicClient.delete(deploymentName1, apiVersion = "apps/v1", resourcePlural = "deployments") + + Await.result(system.terminate(), 10.seconds) + +}