Skip to content

Commit

Permalink
Merge pull request #163 from hagay3/feature/hagai/awsTokenRefresh
Browse files Browse the repository at this point in the history
AwsAuthRefreshable - aws refreshable token
  • Loading branch information
hagay3 authored Jun 23, 2022
2 parents 73a189b + 96f5ac4 commit eec68ac
Show file tree
Hide file tree
Showing 10 changed files with 419 additions and 51 deletions.
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ Skuber is a Scala client library for [Kubernetes](http://kubernetes.io). It prov


## Features

- Uses standard `kubeconfig` files for configuration - see the [configuration guide](docs/Configuration.md) for details
- Refreshing EKS tokens [Refresh EKS Token guide](docs/Refresh_EKS_AWS_Token.md)
- Comprehensive support for Kubernetes API model represented as Scala case classes
- Support for core, extensions and other Kubernetes API groups
- Full support for converting resources between the case class and standard JSON representations
- Client API for creating, reading, updating, removing, listing and watching resources on a Kubernetes cluster
- The API is asynchronous and strongly typed e.g. `k8s get[Deployment]("nginx")` returns a value of type `Future[Deployment]`
- Fluent API for creating and updating specifications of Kubernetes resources
- Uses standard `kubeconfig` files for configuration - see the [configuration guide](docs/Configuration.md) for details

See the [programming guide](docs/GUIDE.md) for more details.

Expand Down Expand Up @@ -147,18 +147,18 @@ ci.yaml and clean.yaml are generated automatically with [sbt-github-actions](htt
Run `sbt githubWorkflowGenerate && bash infra/ci/fix-workflows.sh` in order to regenerate ci.yaml and clean.yaml.
CI Running against the following k8s versions
skuber supports all other k8s versions, not all of them tested under CI.
https://kubernetes.io/releases/
* v1.19.6
* v1.19.6
* v1.20.11
* v1.21.5
* v1.22.9
* v1.23.6
* v1.24.1
skuber supports all other k8s versions, not all of them tested under CI.
https://kubernetes.io/releases/
## License
Expand Down
22 changes: 20 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import sbtassembly.AssemblyKeys.assembly
import sbtassembly.{MergeStrategy, PathList}
import xerial.sbt.Sonatype._
resolvers += "Typesafe Releases" at "https://repo.typesafe.com/typesafe/releases/"

Expand Down Expand Up @@ -37,6 +39,10 @@ val logback = "ch.qos.logback" % "logback-classic" % "1.2.11" % Runtime
// the Json formatters are based on Play Json
val playJson = "com.typesafe.play" %% "play-json" % "2.9.2"

val awsJavaSdkCore = "com.amazonaws" % "aws-java-sdk-core" % "1.12.233"
val awsJavaSdkSts = "com.amazonaws" % "aws-java-sdk-sts" % "1.12.233"
val apacheCommonsLogging = "commons-logging" % "commons-logging" % "1.2"

// Need Java 8 or later as the java.time package is used to represent K8S timestamps
scalacOptions += "-target:jvm-1.8"

Expand All @@ -52,7 +58,7 @@ ThisBuild / homepage := Some(url("https://github.com/hagay3"))

publishTo := sonatypePublishToBundle.value
sonatypeCredentialHost := Sonatype.sonatype01
updateOptions in ThisBuild := updateOptions.value.withGigahorse(false)
ThisBuild / updateOptions := updateOptions.value.withGigahorse(false)

sonatypeProjectHosting := Some(GitHubHosting("hagay3", "skuber", "[email protected]"))

Expand Down Expand Up @@ -138,6 +144,7 @@ lazy val skuberSettings = Seq(
name := "skuber",
libraryDependencies ++= Seq(
akkaHttp, akkaStream, playJson, snakeYaml, commonsIO, commonsCodec, bouncyCastle,
awsJavaSdkCore, awsJavaSdkSts, apacheCommonsLogging,
scalaCheck % Test, specs2 % Test, mockito % Test, akkaStreamTestKit % Test,
scalaTest % Test
).map(_.exclude("commons-logging", "commons-logging"))
Expand Down Expand Up @@ -174,7 +181,18 @@ lazy val skuber = (project in file("client"))
lazy val examples = (project in file("examples"))
.settings(
commonSettings,
mergeStrategy,
crossScalaVersions := supportedScalaVersion)
.settings(examplesSettings: _*)
.settings(examplesAssemblySettings: _*)
.dependsOn(skuber)
.dependsOn(skuber)

val mergeStrategy = Seq(
assembly / assemblyMergeStrategy := {
case PathList("module-info.class") => MergeStrategy.last
case path if path.endsWith("/module-info.class") => MergeStrategy.last
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
}
)
21 changes: 11 additions & 10 deletions client/src/main/scala/skuber/api/Configuration.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
package skuber.api

import java.io.File
import java.net.URL
import java.time.Instant
import java.time.format.DateTimeFormatter

import scala.collection.JavaConverters._
import scala.util.Try
import scala.util.Failure
import java.util.{Base64, Date}

import org.yaml.snakeyaml.Yaml
import skuber.Namespace
import skuber.api.client._

import scala.io.Source

/**
Expand Down Expand Up @@ -149,22 +145,27 @@ object Configuration {
}
}

def topLevelYamlToK8SConfigMap[K8SConfigKind](kind: String, toK8SConfig: YamlMap=> K8SConfigKind) =
topLevelList(kind + "s").asScala.map(item => name(item) -> toK8SConfig(child(item, kind))).toMap
def topLevelYamlToK8SConfigMap[K8SConfigKind](kind: String, toK8SConfig: (YamlMap, String) => K8SConfigKind): Map[String, K8SConfigKind] = {
topLevelList(kind + "s").asScala.map{ item =>
val clusterName = name(item)
name(item) -> toK8SConfig(child(item, kind), clusterName)
}.toMap
}


def toK8SCluster(clusterConfig: YamlMap) =
def toK8SCluster(clusterConfig: YamlMap, clusterName: String) =
Cluster(
apiVersion=valueAt(clusterConfig, "api-version", Some("v1")),
server=valueAt(clusterConfig,"server",Some("http://localhost:8001")),
insecureSkipTLSVerify=valueAt(clusterConfig,"insecure-skip-tls-verify",Some(false)),
certificateAuthority=pathOrDataValueAt(clusterConfig, "certificate-authority","certificate-authority-data")
certificateAuthority=pathOrDataValueAt(clusterConfig, "certificate-authority","certificate-authority-data"),
clusterName = Some(clusterName)
)


val k8sClusterMap = topLevelYamlToK8SConfigMap("cluster", toK8SCluster)

def toK8SAuthInfo(userConfig:YamlMap): AuthInfo = {
def toK8SAuthInfo(userConfig:YamlMap, clusterName: String): AuthInfo = {

def authProviderRead(authProvider: YamlMap): Option[AuthProviderAuth] = {
val config = child(authProvider, "config")
Expand Down Expand Up @@ -207,7 +208,7 @@ object Configuration {
}
val k8sAuthInfoMap = topLevelYamlToK8SConfigMap("user", toK8SAuthInfo)

def toK8SContext(contextConfig: YamlMap) = {
def toK8SContext(contextConfig: YamlMap, clusterName: String) = {
val cluster=contextConfig.asScala.get("cluster").filterNot(_ == "").map { clusterName =>
k8sClusterMap.get(clusterName.asInstanceOf[String]).get
}.getOrElse(Cluster())
Expand Down
26 changes: 17 additions & 9 deletions client/src/main/scala/skuber/api/client/Cluster.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
package skuber.api.client

import com.amazonaws.regions.Regions

/**
* @author David O'Riordan
*
* Defines the details needed to communicate with the API server for a Kubernetes cluster
*/
* @author David O'Riordan
*
* Defines the details needed to communicate with the API server for a Kubernetes cluster
*/
case class Cluster(
apiVersion: String = "v1",
server: String = defaultApiServerURL,
insecureSkipTLSVerify: Boolean = false,
certificateAuthority: Option[PathOrData] = None
)
apiVersion: String = "v1",
server: String = defaultApiServerURL,
insecureSkipTLSVerify: Boolean = false,
certificateAuthority: Option[PathOrData] = None,
clusterName: Option[String] = None,
awsRegion: Option[Regions] = None
) {
def withName(name: String): Cluster = this.copy(clusterName = Some(name))

def withAwsRegion(region: Regions): Cluster = this.copy(awsRegion = Some(region))
}
51 changes: 32 additions & 19 deletions client/src/main/scala/skuber/api/client/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ package skuber.api

import java.time.Instant
import java.util.UUID

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.stream.Materializer
import akka.http.scaladsl.model.{HttpCharsets, HttpRequest, HttpResponse, MediaType}
import akka.stream.scaladsl.Flow
import com.typesafe.config.{Config, ConfigFactory}
import org.joda.time.DateTime
import play.api.libs.functional.syntax._
import play.api.libs.json.Reads._
import play.api.libs.json._

import skuber.ObjectResource
import skuber.api.client.impl.KubernetesClientImpl
import skuber.api.client.token.RefreshableToken
import scala.sys.SystemProperties
import scala.util.Try
import skuber.{LabelSelector, ObjectResource}
import skuber.api.client.impl.KubernetesClientImpl

/**
* @author David O'Riordan
Expand Down Expand Up @@ -80,8 +79,15 @@ package object client {
.mkString
}

sealed trait AuthProviderAuth extends AccessTokenAuth {
trait AuthProviderAuth extends AccessTokenAuth {
def name: String
}

trait AuthProviderRefreshableAuth extends AuthProviderAuth {
def refreshToken: RefreshableToken
def generateToken: String
def name: String
def isTokenExpired(refreshableToken: RefreshableToken): Boolean
}

// 'jwt' supports an oidc id token per https://kubernetes.io/docs/admin/authentication/#option-1---oidc-authenticator
Expand All @@ -94,35 +100,44 @@ package object client {
override def toString = """OidcAuth(idToken=<redacted>)"""
}

final case class GcpAuth private(private val config: GcpConfiguration) extends AuthProviderAuth {
final case class GcpAuth private(private val config: GcpConfiguration) extends AuthProviderRefreshableAuth {
override val name = "gcp"

@volatile private var refresh: Option[GcpRefresh] = config.cachedAccessToken.map(token => GcpRefresh(token.accessToken, token.expiry))
@volatile private var refresh: Option[RefreshableToken] = config.cachedAccessToken.map(token => GcpRefresh(token.accessToken, token.expiry).toRefreshableToken)

private def refreshGcpToken(): GcpRefresh = {
val output = config.cmd.execute()
val parsed = Json.parse(output).as[GcpRefresh]
override def refreshToken: RefreshableToken = {
val output = generateToken
val parsed = Json.parse(output).as[GcpRefresh].toRefreshableToken
refresh = Some(parsed)
parsed
}

def accessToken: String = this.synchronized {
refresh match {
case Some(expired) if expired.expired =>
refreshGcpToken().accessToken
case Some(token) if isTokenExpired(token) =>
refreshToken.accessToken
case None =>
refreshGcpToken().accessToken
refreshToken.accessToken
case Some(token) =>
token.accessToken
}
}

override def toString =
"""GcpAuth(accessToken=<redacted>)""".stripMargin

override def isTokenExpired(refreshableToken: RefreshableToken): Boolean = {
DateTime.now.isAfter(refreshableToken.expiry.minusSeconds(20))
}
override def generateToken: String = config.cmd.execute()
}

final private[client] case class GcpRefresh(accessToken: String, expiry: Instant) {
def expired: Boolean = Instant.now.isAfter(expiry.minusSeconds(20))

def toRefreshableToken: RefreshableToken = {
val expirationDate = new DateTime(this.expiry.toEpochMilli)
RefreshableToken(accessToken = this.accessToken, expiry = expirationDate)
}
}

private[client] object GcpRefresh {
Expand All @@ -136,9 +151,7 @@ package object client {

final case class GcpConfiguration(cachedAccessToken: Option[GcpCachedAccessToken], cmd: GcpCommand)

final case class GcpCachedAccessToken(accessToken: String, expiry: Instant) {
def expired: Boolean = Instant.now.isAfter(expiry.minusSeconds(20))
}
final case class GcpCachedAccessToken(accessToken: String, expiry: Instant)

final case class GcpCommand(cmd: String, args: String) {

Expand Down
Loading

0 comments on commit eec68ac

Please sign in to comment.