Skip to content

Commit

Permalink
Merge pull request #218 from jahzielHA/issue-204-bound-service-account
Browse files Browse the repository at this point in the history
Issue 204 - Bound tokens for incluster configuration
  • Loading branch information
hagay3 authored Sep 8, 2022
2 parents 1abf848 + 1b25012 commit 7cce0fd
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 33 deletions.
4 changes: 4 additions & 0 deletions client/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,8 @@ skuber {
# reclaim the unused resources.
pool-idle-timeout = 30s
}

in-cluster {
refresh-token-interval = 5m
}
}
23 changes: 14 additions & 9 deletions client/src/main/scala/skuber/api/Configuration.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package skuber.api

import org.yaml.snakeyaml.Yaml
import skuber.Namespace
import skuber.api.client._
import skuber.api.client.token.{FileTokenAuthRefreshable, FileTokenConfiguration}
import skuber.config.SkuberConfig

import java.net.URL
import java.time.Instant
import java.time.format.DateTimeFormatter
import scala.collection.JavaConverters._
import scala.util.Try
import scala.util.Failure
import java.util.{Base64, Date}
import org.yaml.snakeyaml.Yaml
import skuber.Namespace
import skuber.api.client._
import scala.collection.JavaConverters._
import scala.concurrent.duration.{Duration, DurationInt}
import scala.io.Source
import scala.util.{Failure, Try}

/**
* @author David O'Riordan
Expand Down Expand Up @@ -229,7 +232,6 @@ object Configuration {
* https://github.com/kubernetes-client/java/blob/master/util/src/main/java/io/kubernetes/client/util/ClientBuilder.java#L134
*/
lazy val inClusterConfig: Try[Configuration] = {

val rootK8sFolder = "/var/run/secrets/kubernetes.io/serviceaccount"
val tokenPath = s"$rootK8sFolder/token"
val namespacePath = s"$rootK8sFolder/namespace"
Expand All @@ -251,14 +253,18 @@ object Configuration {
//"Expected to load root CA config from %s, but got err: %v", rootCAFile, err)
lazy val ca: Option[PathOrData] = if (Files.exists(Paths.get(caPath))) Some(Left(caPath)) else None

lazy val refreshTokenInterval: Duration = SkuberConfig.load().getDuration("in-config.refresh-token-interval", 5.minutes)

for {
host <- maybeHost
port <- maybePort
token <- maybeToken
namespace <- maybeNamespace
hostPort = s"https://$host${if (port.length > 0) ":" + port else ""}"
cluster = Cluster(server = hostPort, certificateAuthority = ca)
ctx = Context(cluster, TokenAuth(token), Namespace.forName(namespace))
ctx = Context(cluster = cluster,
authInfo = FileTokenAuthRefreshable(FileTokenConfiguration(cachedAccessToken= Some(token), tokenPath = tokenPath, refreshTokenInterval)),
namespace = Namespace.forName(namespace))
} yield Configuration(clusters = Map("default" -> cluster),
contexts = Map("default" -> ctx),
currentContext = ctx)
Expand Down Expand Up @@ -297,5 +303,4 @@ object Configuration {
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object PodExecImpl {
requestContext.log.info(s"Connected to container ${containerPrintName} of pod ${podName}")
close.future.foreach { _ =>
requestContext.log.info(s"Close the connection of container ${containerPrintName} of pod ${podName}")
promise.success(None)
promise.trySuccess(None)
}
}
Future.sequence(Seq(connected, close.future, promise.future)).map { _ => () }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import skuber.api.watch.{LongPollingPool, Watch, WatchSource}
import skuber.json.PlayJsonSupportForAkkaHttp._
import skuber.json.format.apiobj.statusReads
import skuber.json.format.{apiVersionsFormatReads, deleteOptionsFmt, namespaceListFmt}

import javax.net.ssl.SSLContext
import skuber.apiextensions.CustomResourceDefinition.Scope
import skuber.config.SkuberConfig

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -673,44 +676,28 @@ class KubernetesClientImpl private[client] (val requestMaker: (Uri, HttpMethod)
response.discardEntityBytes().future.map(done => ())
}
}

}

object KubernetesClientImpl {

def apply(k8sContext: Context, logConfig: LoggingConfig, closeHook: Option[() => Unit], appConfig: Config)
(implicit actorSystem: ActorSystem): KubernetesClientImpl =
{
appConfig.checkValid(ConfigFactory.defaultReference(), "skuber")

def getSkuberConfig[T](key: String, fromConfig: String => Option[T], default: T): T = {
val skuberConfigKey = s"skuber.$key"
if (appConfig.getIsNull(skuberConfigKey)) {
default
} else {
fromConfig(skuberConfigKey) match {
case None => default
case Some(t) => t
}
}
}
val skuberConfig = SkuberConfig.load(appConfig)

def dispatcherFromConfig(configKey: String): Option[ExecutionContext] = if (appConfig.getString(configKey).isEmpty) {
None
} else {
Some(actorSystem.dispatchers.lookup(appConfig.getString(configKey)))
}

implicit val dispatcher: ExecutionContext = getSkuberConfig("akka.dispatcher", dispatcherFromConfig, actorSystem.dispatcher)

def durationFomConfig(configKey: String): Option[Duration] = Some(Duration.fromNanos(appConfig.getDuration(configKey).toNanos))

val watchIdleTimeout: Duration = getSkuberConfig("watch.idle-timeout", durationFomConfig, Duration.Inf)
val podLogIdleTimeout: Duration = getSkuberConfig("pod-log.idle-timeout", durationFomConfig, Duration.Inf)
implicit val dispatcher: ExecutionContext = skuberConfig.getSkuberConfig("akka.dispatcher", dispatcherFromConfig, actorSystem.dispatcher)

val watchContinuouslyRequestTimeout: Duration = getSkuberConfig("watch-continuously.request-timeout", durationFomConfig, 30.seconds)
val watchContinuouslyIdleTimeout: Duration = getSkuberConfig("watch-continuously.idle-timeout", durationFomConfig, 60.seconds)
val watchPoolIdleTimeout: Duration = getSkuberConfig("watch-continuously.pool-idle-timeout", durationFomConfig, 60.seconds)
val watchIdleTimeout: Duration = skuberConfig.getDuration("watch.idle-timeout", Duration.Inf)
val podLogIdleTimeout: Duration = skuberConfig.getDuration("pod-log.idle-timeout", Duration.Inf)
val watchContinuouslyRequestTimeout: Duration = skuberConfig.getDuration("watch-continuously.request-timeout", 30.seconds)
val watchContinuouslyIdleTimeout: Duration = skuberConfig.getDuration("watch-continuously.idle-timeout", 60.seconds)
val watchPoolIdleTimeout: Duration = skuberConfig.getDuration("watch-continuously.pool-idle-timeout", 60.seconds)

//The watch idle timeout needs to be greater than watch api request timeout
require(watchContinuouslyIdleTimeout > watchContinuouslyRequestTimeout)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package skuber.api.client.token

import org.joda.time.DateTime
import skuber.K8SException
import skuber.api.client.{AuthProviderRefreshableAuth, Status}

import scala.concurrent.duration.{Duration, DurationInt}
import scala.util.{Failure, Success}

final case class FileTokenAuthRefreshable(config: FileTokenConfiguration) extends TokenAuthRefreshable with FileReaderComponent {}

final case class FileTokenConfiguration(
cachedAccessToken: Option[String],
tokenPath: String,
refreshInterval: Duration = 5.minutes,
)

trait TokenAuthRefreshable extends AuthProviderRefreshableAuth { self: ContentReaderComponent =>
val config: FileTokenConfiguration

private val refreshInterval: Duration = config.refreshInterval
@volatile private var cachedToken: Option[RefreshableToken] = config.cachedAccessToken.map(buildRefreshableToken)

private val tokenPath: String = config.tokenPath

override def name: String = "file-token"
override def toString: String = """FileTokenAuthRefreshable(accessToken=<redacted>)""".stripMargin

override def refreshToken: RefreshableToken = {
val refreshedToken = buildRefreshableToken(generateToken)
cachedToken = Some(refreshedToken)
refreshedToken
}

override def generateToken: String = {
val maybeToken = contentReader.read(tokenPath)
maybeToken match {
case Success(token) => token
case Failure(e) => throw new K8SException(Status(reason = Option(e.getMessage)))
}
}

override def isTokenExpired(refreshableToken: RefreshableToken): Boolean =
refreshableToken.expiry.isBefore(System.currentTimeMillis)

override def accessToken: String = this.synchronized {
cachedToken match {
case Some(token) if isTokenExpired(token) =>
refreshToken.accessToken
case None =>
refreshToken.accessToken
case Some(token) =>
token.accessToken
}
}

private def buildRefreshableToken(accessToken: String): RefreshableToken = {
RefreshableToken(accessToken, DateTime.now.plus(refreshInterval.toMillis))
}
}

26 changes: 26 additions & 0 deletions client/src/main/scala/skuber/api/client/token/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package skuber.api.client

import scala.io.Source
import scala.util.Try

package object token {
trait ContentReaderComponent {
val contentReader: ContentReader

trait ContentReader {
def read(filePath: String): Try[String]
}
}

trait FileReaderComponent extends ContentReaderComponent {
val contentReader: ContentReader = new FileContentReader

class FileContentReader extends ContentReader {
def read(filePath: String): Try[String] = for {
source <- Try(Source.fromFile(filePath, "utf-8"))
content <- Try(source.getLines().mkString("\n"))
_ <- Try(source.close())
} yield content
}
}
}
32 changes: 32 additions & 0 deletions client/src/main/scala/skuber/config/SkuberConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package skuber.config

import com.typesafe.config.{Config, ConfigFactory}
import skuber.config.SkuberConfig.skuberKeyPath

import scala.concurrent.duration.Duration

case class SkuberConfig(appConfig: Config) {
def getSkuberConfig[T](key: String, fromConfig: String => Option[T], default: T): T = {
val skuberConfigKey = s"$skuberKeyPath.$key"
if (appConfig.getIsNull(skuberConfigKey)) {
default
} else {
fromConfig(skuberConfigKey) match {
case None => default
case Some(t) => t
}
}
}

def getDuration(configKey: String, default: Duration = Duration.Inf): Duration = getSkuberConfig(configKey, durationFromConfig, default)
def durationFromConfig(configKey: String): Option[Duration] = Some(Duration.fromNanos(appConfig.getDuration(configKey).toNanos))
}

object SkuberConfig {
final val skuberKeyPath = "skuber"

def load(appConfig: Config = ConfigFactory.load()): SkuberConfig = {
appConfig.checkValid(ConfigFactory.defaultReference(), skuberKeyPath)
SkuberConfig(appConfig)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package skuber.api.client.token

import org.joda.time.DateTime
import org.specs2.mutable.Specification

import scala.concurrent.duration.DurationInt
import scala.util.Try

class FileTokenAuthRefreshableSpec extends Specification {
"This is a specification for the 'FileTokenAuthRefreshable' class".txt

trait MockFileReaderComponent extends ContentReaderComponent {
val contentReader: ContentReader = new MockFileReaderComponent

class MockFileReaderComponent extends ContentReader {
def read(filePath: String): Try[String] = Try(DateTime.now.toString())
}
}

final case class MockFileTokenAuthRefreshable(config: FileTokenConfiguration) extends TokenAuthRefreshable with MockFileReaderComponent {}

"FileTokenAuthRefreshable" should {
"Retrieve the token if none provided" in {
val initialToken : Option[String] = None
val fileTokenRefreshable = MockFileTokenAuthRefreshable(FileTokenConfiguration(cachedAccessToken = initialToken, tokenPath = "/tmp/token", refreshInterval = 100.milliseconds))
fileTokenRefreshable.accessToken.nonEmpty must beTrue
}

"Refresh the token after the refresh interval" in {
val initialToken = "cachedToken"
val fileTokenRefreshable = MockFileTokenAuthRefreshable(FileTokenConfiguration(Some(initialToken), "/tmp/token", 100.milliseconds))
fileTokenRefreshable.accessToken shouldEqual initialToken

Thread.sleep(150)
val refreshed = fileTokenRefreshable.accessToken
refreshed shouldNotEqual initialToken

Thread.sleep(150)
fileTokenRefreshable.accessToken shouldNotEqual refreshed
}
}
}
43 changes: 43 additions & 0 deletions client/src/test/scala/skuber/config/SkuberConfigSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package skuber.config

import com.typesafe.config.ConfigFactory
import org.specs2.mutable.Specification

import scala.concurrent.duration.{Duration, DurationInt}

class SkuberConfigSpec extends Specification {
"This is a specification for the 'SkuberConfigSpec' class".txt

"SkuberConfig" should {
"in-cluster" should {
"refresh token interval defaults to 5 min if no configuration provided" in {
val refreshTokenInterval = SkuberConfig.load().getDuration("in-cluster.refresh-token-interval")
refreshTokenInterval shouldEqual 5.minutes
}

"refresh token interval value provided by the configuration" in {
val appConfig = ConfigFactory.parseString(
"""
|skuber.in-cluster.refresh-token-interval = 100ms
""".stripMargin)
.withFallback(ConfigFactory.load())

val refreshTokenInterval = SkuberConfig.load(appConfig).getDuration("in-cluster.refresh-token-interval")
refreshTokenInterval shouldEqual 100.milliseconds
}
}
"watch-continuously" should {
"defaults are provided" in {
val skuberConfig = SkuberConfig.load()
val watchContinuouslyRequestTimeout: Duration = skuberConfig.getDuration("watch-continuously.request-timeout")
watchContinuouslyRequestTimeout shouldEqual 30.seconds

val watchContinuouslyIdleTimeout: Duration = skuberConfig.getDuration("watch-continuously.idle-timeout")
watchContinuouslyIdleTimeout shouldEqual 60.seconds

val watchPoolIdleTimeout: Duration = skuberConfig.getDuration("watch-continuously.pool-idle-timeout")
watchPoolIdleTimeout shouldEqual 30.seconds
}
}
}
}
15 changes: 15 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,21 @@ Initiailly Skuber tries out-of-cluster methods in sequence (stops on first succe

If all above fails Skuber tries [in-cluster configuration method](https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod)

### In Cluster configuration

Since kubernetes 1.21 the service account tokens have changed to bound service account tokens (See: [Bound Service Account Token Volume](https://kubernetes.io/docs/reference/access-authn-authz/service-accounts-admin/#bound-service-account-token-volume)). The service account token needs to be refreshed and reloaded periodically from disk.
Skuber by default reloads the token from disk every 5 minutes.

The refresh token interval can be changed updating the following configuration property:

```config
skuber {
in-config {
refresh-token-interval = 5m
}
}
```



### Security
Expand Down

0 comments on commit 7cce0fd

Please sign in to comment.