Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AwsAuthRefreshable - aws refreshable token #163

Merged
merged 12 commits into from
Jun 23, 2022
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ Skuber is a Scala client library for [Kubernetes](http://kubernetes.io). It prov


## Features

- Uses standard `kubeconfig` files for configuration - see the [configuration guide](docs/Configuration.md) for details
- Refreshing EKS tokens [Refresh EKS Token guide](docs/Refresh_EKS_AWS_Token.md)
- Comprehensive support for Kubernetes API model represented as Scala case classes
- Support for core, extensions and other Kubernetes API groups
- Full support for converting resources between the case class and standard JSON representations
- Client API for creating, reading, updating, removing, listing and watching resources on a Kubernetes cluster
- The API is asynchronous and strongly typed e.g. `k8s get[Deployment]("nginx")` returns a value of type `Future[Deployment]`
- Fluent API for creating and updating specifications of Kubernetes resources
- Uses standard `kubeconfig` files for configuration - see the [configuration guide](docs/Configuration.md) for details

See the [programming guide](docs/GUIDE.md) for more details.

Expand Down Expand Up @@ -147,18 +147,18 @@ ci.yaml and clean.yaml are generated automatically with [sbt-github-actions](htt
Run `sbt githubWorkflowGenerate && bash infra/ci/fix-workflows.sh` in order to regenerate ci.yaml and clean.yaml.

CI Running against the following k8s versions

skuber supports all other k8s versions, not all of them tested under CI.

https://kubernetes.io/releases/

* v1.19.6
* v1.19.6
* v1.20.11
* v1.21.5
* v1.22.9
* v1.23.6
* v1.24.1

skuber supports all other k8s versions, not all of them tested under CI.

https://kubernetes.io/releases/



## License

Expand Down
22 changes: 20 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import sbtassembly.AssemblyKeys.assembly
import sbtassembly.{MergeStrategy, PathList}
import xerial.sbt.Sonatype._
resolvers += "Typesafe Releases" at "https://repo.typesafe.com/typesafe/releases/"

Expand Down Expand Up @@ -37,6 +39,10 @@ val logback = "ch.qos.logback" % "logback-classic" % "1.2.11" % Runtime
// the Json formatters are based on Play Json
val playJson = "com.typesafe.play" %% "play-json" % "2.9.2"

val awsJavaSdkCore = "com.amazonaws" % "aws-java-sdk-core" % "1.12.233"
val awsJavaSdkSts = "com.amazonaws" % "aws-java-sdk-sts" % "1.12.233"
val apacheCommonsLogging = "commons-logging" % "commons-logging" % "1.2"

// Need Java 8 or later as the java.time package is used to represent K8S timestamps
scalacOptions += "-target:jvm-1.8"

Expand All @@ -52,7 +58,7 @@ ThisBuild / homepage := Some(url("https://github.com/hagay3"))

publishTo := sonatypePublishToBundle.value
sonatypeCredentialHost := Sonatype.sonatype01
updateOptions in ThisBuild := updateOptions.value.withGigahorse(false)
ThisBuild / updateOptions := updateOptions.value.withGigahorse(false)

sonatypeProjectHosting := Some(GitHubHosting("hagay3", "skuber", "[email protected]"))

Expand Down Expand Up @@ -138,6 +144,7 @@ lazy val skuberSettings = Seq(
name := "skuber",
libraryDependencies ++= Seq(
akkaHttp, akkaStream, playJson, snakeYaml, commonsIO, commonsCodec, bouncyCastle,
awsJavaSdkCore, awsJavaSdkSts, apacheCommonsLogging,
scalaCheck % Test, specs2 % Test, mockito % Test, akkaStreamTestKit % Test,
scalaTest % Test
).map(_.exclude("commons-logging", "commons-logging"))
Expand Down Expand Up @@ -174,7 +181,18 @@ lazy val skuber = (project in file("client"))
lazy val examples = (project in file("examples"))
.settings(
commonSettings,
mergeStrategy,
crossScalaVersions := supportedScalaVersion)
.settings(examplesSettings: _*)
.settings(examplesAssemblySettings: _*)
.dependsOn(skuber)
.dependsOn(skuber)

val mergeStrategy = Seq(
assembly / assemblyMergeStrategy := {
case PathList("module-info.class") => MergeStrategy.last
case path if path.endsWith("/module-info.class") => MergeStrategy.last
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
}
)
21 changes: 11 additions & 10 deletions client/src/main/scala/skuber/api/Configuration.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
package skuber.api

import java.io.File
import java.net.URL
import java.time.Instant
import java.time.format.DateTimeFormatter

import scala.collection.JavaConverters._
import scala.util.Try
import scala.util.Failure
import java.util.{Base64, Date}

import org.yaml.snakeyaml.Yaml
import skuber.Namespace
import skuber.api.client._

import scala.io.Source

/**
Expand Down Expand Up @@ -149,22 +145,27 @@ object Configuration {
}
}

def topLevelYamlToK8SConfigMap[K8SConfigKind](kind: String, toK8SConfig: YamlMap=> K8SConfigKind) =
topLevelList(kind + "s").asScala.map(item => name(item) -> toK8SConfig(child(item, kind))).toMap
def topLevelYamlToK8SConfigMap[K8SConfigKind](kind: String, toK8SConfig: (YamlMap, String) => K8SConfigKind): Map[String, K8SConfigKind] = {
topLevelList(kind + "s").asScala.map{ item =>
val clusterName = name(item)
name(item) -> toK8SConfig(child(item, kind), clusterName)
}.toMap
}


def toK8SCluster(clusterConfig: YamlMap) =
def toK8SCluster(clusterConfig: YamlMap, clusterName: String) =
Cluster(
apiVersion=valueAt(clusterConfig, "api-version", Some("v1")),
server=valueAt(clusterConfig,"server",Some("http://localhost:8001")),
insecureSkipTLSVerify=valueAt(clusterConfig,"insecure-skip-tls-verify",Some(false)),
certificateAuthority=pathOrDataValueAt(clusterConfig, "certificate-authority","certificate-authority-data")
certificateAuthority=pathOrDataValueAt(clusterConfig, "certificate-authority","certificate-authority-data"),
clusterName = Some(clusterName)
)


val k8sClusterMap = topLevelYamlToK8SConfigMap("cluster", toK8SCluster)

def toK8SAuthInfo(userConfig:YamlMap): AuthInfo = {
def toK8SAuthInfo(userConfig:YamlMap, clusterName: String): AuthInfo = {

def authProviderRead(authProvider: YamlMap): Option[AuthProviderAuth] = {
val config = child(authProvider, "config")
Expand Down Expand Up @@ -207,7 +208,7 @@ object Configuration {
}
val k8sAuthInfoMap = topLevelYamlToK8SConfigMap("user", toK8SAuthInfo)

def toK8SContext(contextConfig: YamlMap) = {
def toK8SContext(contextConfig: YamlMap, clusterName: String) = {
val cluster=contextConfig.asScala.get("cluster").filterNot(_ == "").map { clusterName =>
k8sClusterMap.get(clusterName.asInstanceOf[String]).get
}.getOrElse(Cluster())
Expand Down
26 changes: 17 additions & 9 deletions client/src/main/scala/skuber/api/client/Cluster.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
package skuber.api.client

import com.amazonaws.regions.Regions

/**
* @author David O'Riordan
*
* Defines the details needed to communicate with the API server for a Kubernetes cluster
*/
* @author David O'Riordan
*
* Defines the details needed to communicate with the API server for a Kubernetes cluster
*/
case class Cluster(
apiVersion: String = "v1",
server: String = defaultApiServerURL,
insecureSkipTLSVerify: Boolean = false,
certificateAuthority: Option[PathOrData] = None
)
apiVersion: String = "v1",
server: String = defaultApiServerURL,
insecureSkipTLSVerify: Boolean = false,
certificateAuthority: Option[PathOrData] = None,
clusterName: Option[String] = None,
awsRegion: Option[Regions] = None
) {
def withName(name: String): Cluster = this.copy(clusterName = Some(name))

def withAwsRegion(region: Regions): Cluster = this.copy(awsRegion = Some(region))
}
51 changes: 32 additions & 19 deletions client/src/main/scala/skuber/api/client/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ package skuber.api

import java.time.Instant
import java.util.UUID

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.stream.Materializer
import akka.http.scaladsl.model.{HttpCharsets, HttpRequest, HttpResponse, MediaType}
import akka.stream.scaladsl.Flow
import com.typesafe.config.{Config, ConfigFactory}
import org.joda.time.DateTime
import play.api.libs.functional.syntax._
import play.api.libs.json.Reads._
import play.api.libs.json._

import skuber.ObjectResource
import skuber.api.client.impl.KubernetesClientImpl
import skuber.api.client.token.RefreshableToken
import scala.sys.SystemProperties
import scala.util.Try
import skuber.{LabelSelector, ObjectResource}
import skuber.api.client.impl.KubernetesClientImpl

/**
* @author David O'Riordan
Expand Down Expand Up @@ -80,8 +79,15 @@ package object client {
.mkString
}

sealed trait AuthProviderAuth extends AccessTokenAuth {
trait AuthProviderAuth extends AccessTokenAuth {
def name: String
}

trait AuthProviderRefreshableAuth extends AuthProviderAuth {
def refreshToken: RefreshableToken
def generateToken: String
def name: String
def isTokenExpired(refreshableToken: RefreshableToken): Boolean
}

// 'jwt' supports an oidc id token per https://kubernetes.io/docs/admin/authentication/#option-1---oidc-authenticator
Expand All @@ -94,35 +100,44 @@ package object client {
override def toString = """OidcAuth(idToken=<redacted>)"""
}

final case class GcpAuth private(private val config: GcpConfiguration) extends AuthProviderAuth {
final case class GcpAuth private(private val config: GcpConfiguration) extends AuthProviderRefreshableAuth {
override val name = "gcp"

@volatile private var refresh: Option[GcpRefresh] = config.cachedAccessToken.map(token => GcpRefresh(token.accessToken, token.expiry))
@volatile private var refresh: Option[RefreshableToken] = config.cachedAccessToken.map(token => GcpRefresh(token.accessToken, token.expiry).toRefreshableToken)

private def refreshGcpToken(): GcpRefresh = {
val output = config.cmd.execute()
val parsed = Json.parse(output).as[GcpRefresh]
override def refreshToken: RefreshableToken = {
val output = generateToken
val parsed = Json.parse(output).as[GcpRefresh].toRefreshableToken
refresh = Some(parsed)
parsed
}

def accessToken: String = this.synchronized {
refresh match {
case Some(expired) if expired.expired =>
refreshGcpToken().accessToken
case Some(token) if isTokenExpired(token) =>
refreshToken.accessToken
case None =>
refreshGcpToken().accessToken
refreshToken.accessToken
case Some(token) =>
token.accessToken
}
}

override def toString =
"""GcpAuth(accessToken=<redacted>)""".stripMargin

override def isTokenExpired(refreshableToken: RefreshableToken): Boolean = {
DateTime.now.isAfter(refreshableToken.expiry.minusSeconds(20))
}
override def generateToken: String = config.cmd.execute()
}

final private[client] case class GcpRefresh(accessToken: String, expiry: Instant) {
def expired: Boolean = Instant.now.isAfter(expiry.minusSeconds(20))

def toRefreshableToken: RefreshableToken = {
val expirationDate = new DateTime(this.expiry.toEpochMilli)
RefreshableToken(accessToken = this.accessToken, expiry = expirationDate)
}
}

private[client] object GcpRefresh {
Expand All @@ -136,9 +151,7 @@ package object client {

final case class GcpConfiguration(cachedAccessToken: Option[GcpCachedAccessToken], cmd: GcpCommand)

final case class GcpCachedAccessToken(accessToken: String, expiry: Instant) {
def expired: Boolean = Instant.now.isAfter(expiry.minusSeconds(20))
}
final case class GcpCachedAccessToken(accessToken: String, expiry: Instant)

final case class GcpCommand(cmd: String, args: String) {

Expand Down
Loading