diff --git a/client/src/main/resources/reference.conf b/client/src/main/resources/reference.conf index 05b5abf1..cedf2ffa 100644 --- a/client/src/main/resources/reference.conf +++ b/client/src/main/resources/reference.conf @@ -32,4 +32,8 @@ skuber { # reclaim the unused resources. pool-idle-timeout = 30s } + + in-cluster { + refresh-token-interval = 5m + } } \ No newline at end of file diff --git a/client/src/main/scala/skuber/api/Configuration.scala b/client/src/main/scala/skuber/api/Configuration.scala index 298dcfa5..ba5617c2 100644 --- a/client/src/main/scala/skuber/api/Configuration.scala +++ b/client/src/main/scala/skuber/api/Configuration.scala @@ -1,16 +1,19 @@ package skuber.api +import org.yaml.snakeyaml.Yaml +import skuber.Namespace +import skuber.api.client._ +import skuber.api.client.token.{FileTokenAuthRefreshable, FileTokenConfiguration} +import skuber.config.SkuberConfig + import java.net.URL import java.time.Instant import java.time.format.DateTimeFormatter -import scala.collection.JavaConverters._ -import scala.util.Try -import scala.util.Failure import java.util.{Base64, Date} -import org.yaml.snakeyaml.Yaml -import skuber.Namespace -import skuber.api.client._ +import scala.collection.JavaConverters._ +import scala.concurrent.duration.{Duration, DurationInt} import scala.io.Source +import scala.util.{Failure, Try} /** * @author David O'Riordan @@ -229,7 +232,6 @@ object Configuration { * https://github.com/kubernetes-client/java/blob/master/util/src/main/java/io/kubernetes/client/util/ClientBuilder.java#L134 */ lazy val inClusterConfig: Try[Configuration] = { - val rootK8sFolder = "/var/run/secrets/kubernetes.io/serviceaccount" val tokenPath = s"$rootK8sFolder/token" val namespacePath = s"$rootK8sFolder/namespace" @@ -251,6 +253,8 @@ object Configuration { //"Expected to load root CA config from %s, but got err: %v", rootCAFile, err) lazy val ca: Option[PathOrData] = if (Files.exists(Paths.get(caPath))) Some(Left(caPath)) else None + lazy val refreshTokenInterval: Duration = SkuberConfig.load().getDuration("in-config.refresh-token-interval", 5.minutes) + for { host <- maybeHost port <- maybePort @@ -258,7 +262,9 @@ object Configuration { namespace <- maybeNamespace hostPort = s"https://$host${if (port.length > 0) ":" + port else ""}" cluster = Cluster(server = hostPort, certificateAuthority = ca) - ctx = Context(cluster, TokenAuth(token), Namespace.forName(namespace)) + ctx = Context(cluster = cluster, + authInfo = FileTokenAuthRefreshable(FileTokenConfiguration(cachedAccessToken= Some(token), tokenPath = tokenPath, refreshTokenInterval)), + namespace = Namespace.forName(namespace)) } yield Configuration(clusters = Map("default" -> cluster), contexts = Map("default" -> ctx), currentContext = ctx) @@ -297,5 +303,4 @@ object Configuration { } } } - } diff --git a/client/src/main/scala/skuber/api/client/exec/PodExecImpl.scala b/client/src/main/scala/skuber/api/client/exec/PodExecImpl.scala index abd921e3..85b750d1 100644 --- a/client/src/main/scala/skuber/api/client/exec/PodExecImpl.scala +++ b/client/src/main/scala/skuber/api/client/exec/PodExecImpl.scala @@ -127,7 +127,7 @@ object PodExecImpl { requestContext.log.info(s"Connected to container ${containerPrintName} of pod ${podName}") close.future.foreach { _ => requestContext.log.info(s"Close the connection of container ${containerPrintName} of pod ${podName}") - promise.success(None) + promise.trySuccess(None) } } Future.sequence(Seq(connected, close.future, promise.future)).map { _ => () } diff --git a/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala b/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala index 9213e0ad..3bda0e46 100644 --- a/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala +++ b/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala @@ -20,8 +20,11 @@ import skuber.api.watch.{LongPollingPool, Watch, WatchSource} import skuber.json.PlayJsonSupportForAkkaHttp._ import skuber.json.format.apiobj.statusReads import skuber.json.format.{apiVersionsFormatReads, deleteOptionsFmt, namespaceListFmt} + import javax.net.ssl.SSLContext import skuber.apiextensions.CustomResourceDefinition.Scope +import skuber.config.SkuberConfig + import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success} @@ -673,7 +676,6 @@ class KubernetesClientImpl private[client] (val requestMaker: (Uri, HttpMethod) response.discardEntityBytes().future.map(done => ()) } } - } object KubernetesClientImpl { @@ -681,19 +683,7 @@ object KubernetesClientImpl { def apply(k8sContext: Context, logConfig: LoggingConfig, closeHook: Option[() => Unit], appConfig: Config) (implicit actorSystem: ActorSystem): KubernetesClientImpl = { - appConfig.checkValid(ConfigFactory.defaultReference(), "skuber") - - def getSkuberConfig[T](key: String, fromConfig: String => Option[T], default: T): T = { - val skuberConfigKey = s"skuber.$key" - if (appConfig.getIsNull(skuberConfigKey)) { - default - } else { - fromConfig(skuberConfigKey) match { - case None => default - case Some(t) => t - } - } - } + val skuberConfig = SkuberConfig.load(appConfig) def dispatcherFromConfig(configKey: String): Option[ExecutionContext] = if (appConfig.getString(configKey).isEmpty) { None @@ -701,16 +691,13 @@ object KubernetesClientImpl { Some(actorSystem.dispatchers.lookup(appConfig.getString(configKey))) } - implicit val dispatcher: ExecutionContext = getSkuberConfig("akka.dispatcher", dispatcherFromConfig, actorSystem.dispatcher) - - def durationFomConfig(configKey: String): Option[Duration] = Some(Duration.fromNanos(appConfig.getDuration(configKey).toNanos)) - - val watchIdleTimeout: Duration = getSkuberConfig("watch.idle-timeout", durationFomConfig, Duration.Inf) - val podLogIdleTimeout: Duration = getSkuberConfig("pod-log.idle-timeout", durationFomConfig, Duration.Inf) + implicit val dispatcher: ExecutionContext = skuberConfig.getSkuberConfig("akka.dispatcher", dispatcherFromConfig, actorSystem.dispatcher) - val watchContinuouslyRequestTimeout: Duration = getSkuberConfig("watch-continuously.request-timeout", durationFomConfig, 30.seconds) - val watchContinuouslyIdleTimeout: Duration = getSkuberConfig("watch-continuously.idle-timeout", durationFomConfig, 60.seconds) - val watchPoolIdleTimeout: Duration = getSkuberConfig("watch-continuously.pool-idle-timeout", durationFomConfig, 60.seconds) + val watchIdleTimeout: Duration = skuberConfig.getDuration("watch.idle-timeout", Duration.Inf) + val podLogIdleTimeout: Duration = skuberConfig.getDuration("pod-log.idle-timeout", Duration.Inf) + val watchContinuouslyRequestTimeout: Duration = skuberConfig.getDuration("watch-continuously.request-timeout", 30.seconds) + val watchContinuouslyIdleTimeout: Duration = skuberConfig.getDuration("watch-continuously.idle-timeout", 60.seconds) + val watchPoolIdleTimeout: Duration = skuberConfig.getDuration("watch-continuously.pool-idle-timeout", 60.seconds) //The watch idle timeout needs to be greater than watch api request timeout require(watchContinuouslyIdleTimeout > watchContinuouslyRequestTimeout) diff --git a/client/src/main/scala/skuber/api/client/token/FileTokenAuthRefreshable.scala b/client/src/main/scala/skuber/api/client/token/FileTokenAuthRefreshable.scala new file mode 100644 index 00000000..17ae0648 --- /dev/null +++ b/client/src/main/scala/skuber/api/client/token/FileTokenAuthRefreshable.scala @@ -0,0 +1,61 @@ +package skuber.api.client.token + +import org.joda.time.DateTime +import skuber.K8SException +import skuber.api.client.{AuthProviderRefreshableAuth, Status} + +import scala.concurrent.duration.{Duration, DurationInt} +import scala.util.{Failure, Success} + +final case class FileTokenAuthRefreshable(config: FileTokenConfiguration) extends TokenAuthRefreshable with FileReaderComponent {} + +final case class FileTokenConfiguration( + cachedAccessToken: Option[String], + tokenPath: String, + refreshInterval: Duration = 5.minutes, +) + +trait TokenAuthRefreshable extends AuthProviderRefreshableAuth { self: ContentReaderComponent => + val config: FileTokenConfiguration + + private val refreshInterval: Duration = config.refreshInterval + @volatile private var cachedToken: Option[RefreshableToken] = config.cachedAccessToken.map(buildRefreshableToken) + + private val tokenPath: String = config.tokenPath + + override def name: String = "file-token" + override def toString: String = """FileTokenAuthRefreshable(accessToken=)""".stripMargin + + override def refreshToken: RefreshableToken = { + val refreshedToken = buildRefreshableToken(generateToken) + cachedToken = Some(refreshedToken) + refreshedToken + } + + override def generateToken: String = { + val maybeToken = contentReader.read(tokenPath) + maybeToken match { + case Success(token) => token + case Failure(e) => throw new K8SException(Status(reason = Option(e.getMessage))) + } + } + + override def isTokenExpired(refreshableToken: RefreshableToken): Boolean = + refreshableToken.expiry.isBefore(System.currentTimeMillis) + + override def accessToken: String = this.synchronized { + cachedToken match { + case Some(token) if isTokenExpired(token) => + refreshToken.accessToken + case None => + refreshToken.accessToken + case Some(token) => + token.accessToken + } + } + + private def buildRefreshableToken(accessToken: String): RefreshableToken = { + RefreshableToken(accessToken, DateTime.now.plus(refreshInterval.toMillis)) + } +} + diff --git a/client/src/main/scala/skuber/api/client/token/package.scala b/client/src/main/scala/skuber/api/client/token/package.scala new file mode 100644 index 00000000..0033a66b --- /dev/null +++ b/client/src/main/scala/skuber/api/client/token/package.scala @@ -0,0 +1,26 @@ +package skuber.api.client + +import scala.io.Source +import scala.util.Try + +package object token { + trait ContentReaderComponent { + val contentReader: ContentReader + + trait ContentReader { + def read(filePath: String): Try[String] + } + } + + trait FileReaderComponent extends ContentReaderComponent { + val contentReader: ContentReader = new FileContentReader + + class FileContentReader extends ContentReader { + def read(filePath: String): Try[String] = for { + source <- Try(Source.fromFile(filePath, "utf-8")) + content <- Try(source.getLines().mkString("\n")) + _ <- Try(source.close()) + } yield content + } + } +} diff --git a/client/src/main/scala/skuber/config/SkuberConfig.scala b/client/src/main/scala/skuber/config/SkuberConfig.scala new file mode 100644 index 00000000..dcd3aeff --- /dev/null +++ b/client/src/main/scala/skuber/config/SkuberConfig.scala @@ -0,0 +1,32 @@ +package skuber.config + +import com.typesafe.config.{Config, ConfigFactory} +import skuber.config.SkuberConfig.skuberKeyPath + +import scala.concurrent.duration.Duration + +case class SkuberConfig(appConfig: Config) { + def getSkuberConfig[T](key: String, fromConfig: String => Option[T], default: T): T = { + val skuberConfigKey = s"$skuberKeyPath.$key" + if (appConfig.getIsNull(skuberConfigKey)) { + default + } else { + fromConfig(skuberConfigKey) match { + case None => default + case Some(t) => t + } + } + } + + def getDuration(configKey: String, default: Duration = Duration.Inf): Duration = getSkuberConfig(configKey, durationFromConfig, default) + def durationFromConfig(configKey: String): Option[Duration] = Some(Duration.fromNanos(appConfig.getDuration(configKey).toNanos)) +} + +object SkuberConfig { + final val skuberKeyPath = "skuber" + + def load(appConfig: Config = ConfigFactory.load()): SkuberConfig = { + appConfig.checkValid(ConfigFactory.defaultReference(), skuberKeyPath) + SkuberConfig(appConfig) + } +} diff --git a/client/src/test/scala/skuber/api/client/token/FileTokenAuthRefreshableSpec.scala b/client/src/test/scala/skuber/api/client/token/FileTokenAuthRefreshableSpec.scala new file mode 100644 index 00000000..b381930b --- /dev/null +++ b/client/src/test/scala/skuber/api/client/token/FileTokenAuthRefreshableSpec.scala @@ -0,0 +1,42 @@ +package skuber.api.client.token + +import org.joda.time.DateTime +import org.specs2.mutable.Specification + +import scala.concurrent.duration.DurationInt +import scala.util.Try + +class FileTokenAuthRefreshableSpec extends Specification { + "This is a specification for the 'FileTokenAuthRefreshable' class".txt + + trait MockFileReaderComponent extends ContentReaderComponent { + val contentReader: ContentReader = new MockFileReaderComponent + + class MockFileReaderComponent extends ContentReader { + def read(filePath: String): Try[String] = Try(DateTime.now.toString()) + } + } + + final case class MockFileTokenAuthRefreshable(config: FileTokenConfiguration) extends TokenAuthRefreshable with MockFileReaderComponent {} + + "FileTokenAuthRefreshable" should { + "Retrieve the token if none provided" in { + val initialToken : Option[String] = None + val fileTokenRefreshable = MockFileTokenAuthRefreshable(FileTokenConfiguration(cachedAccessToken = initialToken, tokenPath = "/tmp/token", refreshInterval = 100.milliseconds)) + fileTokenRefreshable.accessToken.nonEmpty must beTrue + } + + "Refresh the token after the refresh interval" in { + val initialToken = "cachedToken" + val fileTokenRefreshable = MockFileTokenAuthRefreshable(FileTokenConfiguration(Some(initialToken), "/tmp/token", 100.milliseconds)) + fileTokenRefreshable.accessToken shouldEqual initialToken + + Thread.sleep(150) + val refreshed = fileTokenRefreshable.accessToken + refreshed shouldNotEqual initialToken + + Thread.sleep(150) + fileTokenRefreshable.accessToken shouldNotEqual refreshed + } + } +} diff --git a/client/src/test/scala/skuber/config/SkuberConfigSpec.scala b/client/src/test/scala/skuber/config/SkuberConfigSpec.scala new file mode 100644 index 00000000..6f453ef5 --- /dev/null +++ b/client/src/test/scala/skuber/config/SkuberConfigSpec.scala @@ -0,0 +1,43 @@ +package skuber.config + +import com.typesafe.config.ConfigFactory +import org.specs2.mutable.Specification + +import scala.concurrent.duration.{Duration, DurationInt} + +class SkuberConfigSpec extends Specification { + "This is a specification for the 'SkuberConfigSpec' class".txt + + "SkuberConfig" should { + "in-cluster" should { + "refresh token interval defaults to 5 min if no configuration provided" in { + val refreshTokenInterval = SkuberConfig.load().getDuration("in-cluster.refresh-token-interval") + refreshTokenInterval shouldEqual 5.minutes + } + + "refresh token interval value provided by the configuration" in { + val appConfig = ConfigFactory.parseString( + """ + |skuber.in-cluster.refresh-token-interval = 100ms + """.stripMargin) + .withFallback(ConfigFactory.load()) + + val refreshTokenInterval = SkuberConfig.load(appConfig).getDuration("in-cluster.refresh-token-interval") + refreshTokenInterval shouldEqual 100.milliseconds + } + } + "watch-continuously" should { + "defaults are provided" in { + val skuberConfig = SkuberConfig.load() + val watchContinuouslyRequestTimeout: Duration = skuberConfig.getDuration("watch-continuously.request-timeout") + watchContinuouslyRequestTimeout shouldEqual 30.seconds + + val watchContinuouslyIdleTimeout: Duration = skuberConfig.getDuration("watch-continuously.idle-timeout") + watchContinuouslyIdleTimeout shouldEqual 60.seconds + + val watchPoolIdleTimeout: Duration = skuberConfig.getDuration("watch-continuously.pool-idle-timeout") + watchPoolIdleTimeout shouldEqual 30.seconds + } + } + } +} diff --git a/docs/README.md b/docs/README.md index 08c91ed7..417373b5 100644 --- a/docs/README.md +++ b/docs/README.md @@ -105,6 +105,21 @@ Initiailly Skuber tries out-of-cluster methods in sequence (stops on first succe If all above fails Skuber tries [in-cluster configuration method](https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod) +### In Cluster configuration + +Since kubernetes 1.21 the service account tokens have changed to bound service account tokens (See: [Bound Service Account Token Volume](https://kubernetes.io/docs/reference/access-authn-authz/service-accounts-admin/#bound-service-account-token-volume)). The service account token needs to be refreshed and reloaded periodically from disk. +Skuber by default reloads the token from disk every 5 minutes. + +The refresh token interval can be changed updating the following configuration property: + +```config +skuber { + in-config { + refresh-token-interval = 5m + } +} +``` + ### Security