Skip to content

Commit

Permalink
Migrating from Akka to Pekko
Browse files Browse the repository at this point in the history
  • Loading branch information
ncreep committed Oct 2, 2023
1 parent 3bd43af commit cc84c12
Show file tree
Hide file tree
Showing 53 changed files with 221 additions and 223 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ This example lists pods in `kube-system` namespace:
```scala
import skuber._
import skuber.json.format._
import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import scala.util.{Success, Failure}

implicit val system = ActorSystem()
Expand All @@ -43,9 +43,11 @@ This example lists pods in `kube-system` namespace:
Read the [documentation](https://skuber.co) and join [discord community](https://discord.gg/byEh56vFJR) to ask your questions!


**Note: Since Akka license is no more an "Open Source” license, akka version won't be bumped until there will be an equivalent alternative.**
**Note: Since Akka license is no longer an "Open Source” license, the Skuber project moved on to using [Apache Pekko](https://pekko.apache.org), an open-source Akka fork.**

**Currently, skuber implemented with akka 2.6.19 and the license is open-sourced.**
**To help migration from Akka to Pekko, please refer to Pekko's [migration guides](https://pekko.apache.org/docs/pekko/current/project/migration-guides.html).**

**Important: please make sure to rename your `akka` configuration keys to `pekko`. This is important when configuring, e.g., the dispatcher for the application.**


## Features
Expand Down
27 changes: 11 additions & 16 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,14 @@ ThisBuild / scalaVersion := currentScalaVersion

val supportedScalaVersion = Seq(scala12Version, scala13Version, scala3Version)

/**
* 2.6.19 is the last akka open source version
* To comply with other companies' legal issues, akka version wont be bumped.
* https://www.lightbend.com/blog/why-we-are-changing-the-license-for-akka
*/
val akkaVersion = "2.6.19"
val pekkoVersion = "1.0.1"

val scalaCheck = "org.scalacheck" %% "scalacheck" % "1.17.0"

val specs2 = "org.specs2" %% "specs2-core" % "4.19.2"
val scalaTest = "org.scalatest" %% "scalatest" % "3.2.17"

val akkaStreamTestKit = ("com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion).cross(CrossVersion.for3Use2_13)
val pekkoStreamTestKit = ("org.apache.pekko" %% "pekko-stream-testkit" % pekkoVersion).cross(CrossVersion.for3Use2_13)


val snakeYaml = "org.yaml" % "snakeyaml" % "2.0"
Expand All @@ -35,13 +30,13 @@ val commonsCodec = "commons-codec" % "commons-codec" % "1.15"
val bouncyCastle = "org.bouncycastle" % "bcpkix-jdk18on" % "1.76"


// the client API request/response handing uses Akka Http
val akkaHttp = ("com.typesafe.akka" %% "akka-http" % "10.2.9").cross(CrossVersion.for3Use2_13)
val akkaStream = ("com.typesafe.akka" %% "akka-stream" % akkaVersion).cross(CrossVersion.for3Use2_13)
val akka = ("com.typesafe.akka" %% "akka-actor" % akkaVersion).cross(CrossVersion.for3Use2_13)
// the client API request/response handing uses Pkka Http
val pekkoHttp = ("org.apache.pekko" %% "pekko-http" % "1.0.0").cross(CrossVersion.for3Use2_13)
val pekkoStream = ("org.apache.pekko" %% "pekko-stream" % pekkoVersion).cross(CrossVersion.for3Use2_13)
val pekko = ("org.apache.pekko" %% "pekko-actor" % pekkoVersion).cross(CrossVersion.for3Use2_13)

// Skuber uses akka logging, so the examples config uses the akka slf4j logger with logback backend
val akkaSlf4j = ("com.typesafe.akka" %% "akka-slf4j" % akkaVersion).cross(CrossVersion.for3Use2_13)
// Skuber uses pekko logging, so the examples config uses the pekko slf4j logger with logback backend
val pekkoSlf4j = ("org.apache.pekko" %% "pekko-slf4j" % pekkoVersion).cross(CrossVersion.for3Use2_13)
val logback = "ch.qos.logback" % "logback-classic" % "1.4.6" % Runtime

// the Json formatters are based on Play Json
Expand Down Expand Up @@ -154,16 +149,16 @@ inThisBuild(List(
lazy val skuberSettings = Seq(
name := "skuber",
libraryDependencies ++= Seq(
akkaHttp, akkaStream, playJson, snakeYaml, commonsIO, commonsCodec, bouncyCastle,
pekkoHttp, pekkoStream, playJson, snakeYaml, commonsIO, commonsCodec, bouncyCastle,
awsJavaSdkCore, awsJavaSdkSts, apacheCommonsLogging, jacksonDatabind,
scalaCheck % Test, specs2 % Test, akkaStreamTestKit % Test,
scalaCheck % Test, specs2 % Test, pekkoStreamTestKit % Test,
scalaTest % Test
).map(_.exclude("commons-logging", "commons-logging"))
)

lazy val examplesSettings = Seq(
name := "skuber-examples",
libraryDependencies ++= Seq(akka, akkaSlf4j, logback, playJson)
libraryDependencies ++= Seq(pekko, pekkoSlf4j, logback, playJson)
)

// by default run the guestbook example when executing a fat examples JAR
Expand Down
4 changes: 2 additions & 2 deletions client/src/it/scala/skuber/CustomResourceSpec.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package skuber

import java.util.UUID.randomUUID
import akka.stream._
import akka.stream.scaladsl._
import org.apache.pekko.stream._
import org.apache.pekko.stream.scaladsl._
import org.scalactic.source.Position
import skuber.apiextensions.v1beta1.CustomResourceDefinition
import skuber.apiextensions.v1beta1.CustomResourceDefinition._
Expand Down
4 changes: 2 additions & 2 deletions client/src/it/scala/skuber/CustomResourceV1Spec.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package skuber

import akka.stream._
import akka.stream.scaladsl._
import org.apache.pekko.stream._
import org.apache.pekko.stream.scaladsl._
import org.scalatest.BeforeAndAfterAll
import skuber.apiextensions.v1.CustomResourceDefinition
import org.scalatest.matchers.should.Matchers
Expand Down
4 changes: 2 additions & 2 deletions client/src/it/scala/skuber/ExecSpec.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package skuber

import java.util.UUID.randomUUID
import akka.Done
import akka.stream.scaladsl.{Sink, Source}
import org.apache.pekko.Done
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
Expand Down
8 changes: 4 additions & 4 deletions client/src/it/scala/skuber/FutureUtil.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package skuber

import akka.actor.{ActorSystem, Scheduler}
import org.apache.pekko.actor.{ActorSystem, Scheduler}
import org.scalatest.concurrent.ScalaFutures.convertScalaFuture
import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException}
import scala.concurrent.duration._
Expand All @@ -13,12 +13,12 @@ object FutureUtil {

implicit val patienceConfig: PatienceConfig = PatienceConfig(10.second)

def valueT(implicit executionContext: ExecutionContext, akkaActor: ActorSystem): T = value.withTimeout().futureValue
def valueT(implicit executionContext: ExecutionContext, pekkoActor: ActorSystem): T = value.withTimeout().futureValue

def withTimeout(timeout: FiniteDuration = 10.seconds,
cleanup: Option[T => Unit] = None)
(implicit executionContext: ExecutionContext, akkaActor: ActorSystem): Future[T] =
futureTimeout(akkaActor.scheduler, timeout, cleanup)(value)
(implicit executionContext: ExecutionContext, pekkoActor: ActorSystem): Future[T] =
futureTimeout(pekkoActor.scheduler, timeout, cleanup)(value)

def timeoutException(timeout: FiniteDuration) = new TimeoutException(s"Future timed out after ${timeout.toString()}") with NoStackTrace

Expand Down
2 changes: 1 addition & 1 deletion client/src/it/scala/skuber/K8SFixture.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package skuber

import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.Outcome
import org.scalatest.flatspec.{AnyFlatSpec, FixtureAnyFlatSpec}
Expand Down
4 changes: 2 additions & 2 deletions client/src/it/scala/skuber/WatchContinuouslySpec.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package skuber

import java.util.UUID.randomUUID
import akka.stream.KillSwitches
import akka.stream.scaladsl.{Keep, Sink}
import org.apache.pekko.stream.KillSwitches
import org.apache.pekko.stream.scaladsl.{Keep, Sink}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import org.scalatest.concurrent.{Eventually, ScalaFutures}
Expand Down
8 changes: 4 additions & 4 deletions client/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
akka.http.client.websocket.periodic-keep-alive-max-idle = 10 seconds
akka.http.server.websocket.periodic-keep-alive-max-idle = 10 seconds
pekko.http.client.websocket.periodic-keep-alive-max-idle = 10 seconds
pekko.http.server.websocket.periodic-keep-alive-max-idle = 10 seconds

skuber {

akka {
# The ID of the dispatcher to use by Skuber. If undefined or empty the default Akka dispatcher is used.
pekko {
# The ID of the dispatcher to use by Skuber. If undefined or empty the default Pekko dispatcher is used.
dispatcher = ""
}

Expand Down
26 changes: 13 additions & 13 deletions client/src/main/scala/skuber/api/client/KubernetesClient.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package skuber.api.client

import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import org.apache.pekko.util.ByteString
import play.api.libs.json.{Format, Writes}
import skuber.api.patch.Patch
import skuber.{DeleteOptions, HasStatusSubresource, LabelSelector, ListOptions, ListResource, ObjectResource, Pod, ResourceDefinition, Scale}
Expand Down Expand Up @@ -198,7 +198,7 @@ trait KubernetesClient {
* @param obj the name of the object to watch
* @param namespace the namespace (defaults to currently configured namespace)
* @tparam O the type of the object to watch e.g. Pod, Deployment
* @return A future containing an Akka streams Source of WatchEvents that will be emitted
* @return A future containing an Pekko streams Source of WatchEvents that will be emitted
*/
def watch[O <: ObjectResource](obj: O, namespace: Option[String])(implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Future[Source[WatchEvent[O], _]]

Expand All @@ -217,7 +217,7 @@ trait KubernetesClient {
* @param bufSize An optional buffer size for the returned on-the-wire representation of each modified object - normally the default is more than enough.
* @param namespace the namespace (defaults to currently configured namespace)
* @tparam O the type of the resource to watch
* @return A future containing an Akka streams Source of WatchEvents that will be emitted
* @return A future containing an Pekko streams Source of WatchEvents that will be emitted
*/
def watch[O <: ObjectResource](name: String, sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, namespace: Option[String] = None)(implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Future[Source[WatchEvent[O], _]]

Expand All @@ -233,7 +233,7 @@ trait KubernetesClient {
* @param bufSize optional buffer size for each modified object received, normally the default is more than enough
* @param namespace the namespace (defaults to currently configured namespace)
* @tparam O the type of resource to watch e.g. Pod, Dpeloyment
* @return A future containing an Akka streams Source of WatchEvents that will be emitted
* @return A future containing an Pekko streams Source of WatchEvents that will be emitted
*/
def watchAll[O <: ObjectResource](sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, namespace: Option[String] = None)(implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Future[Source[WatchEvent[O], _]]

Expand All @@ -243,7 +243,7 @@ trait KubernetesClient {
* @param obj the object resource to watch
* @tparam O the type of the resource e.g Pod
* @param namespace the namespace (defaults to currently configured namespace)
* @return A future containing an Akka streams Source of WatchEvents that will be emitted
* @return A future containing an Pekko streams Source of WatchEvents that will be emitted
*/
def watchContinuously[O <: ObjectResource](obj: O, namespace: Option[String])(implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _]

Expand All @@ -263,7 +263,7 @@ trait KubernetesClient {
* @param bufSize optional buffer size for received object updates, normally the default is more than enough
* @param namespace the namespace (defaults to currently configured namespace)
* @tparam O the type of the resource
* @return A future containing an Akka streams Source of WatchEvents that will be emitted
* @return A future containing an Pekko streams Source of WatchEvents that will be emitted
*/
def watchContinuously[O <: ObjectResource](name: String, sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, namespace: Option[String] = None)(implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _]

Expand All @@ -279,7 +279,7 @@ trait KubernetesClient {
* @param bufSize optional buffer size for received object updates, normally the default is more than enough
* @param namespace the namespace (defaults to currently configured namespace)
* @tparam O the type pf the resource
* @return A future containing an Akka streams Source of WatchEvents that will be emitted
* @return A future containing an Pekko streams Source of WatchEvents that will be emitted
*/
def watchAllContinuously[O <: ObjectResource](sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, namespace: Option[String] = None)(implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _]

Expand All @@ -292,7 +292,7 @@ trait KubernetesClient {
* @param bufsize optional buffer size for received object updates, normally the default is more than enough
* @param namespace the namespace (defaults to currently configured namespace)
* @tparam O the resource type to watch
* @return A future containing an Akka streams Source of WatchEvents that will be emitted
* @return A future containing an Pekko streams Source of WatchEvents that will be emitted
*/
def watchWithOptions[O <: ObjectResource](options: ListOptions, bufsize: Int = 10000, namespace: Option[String] = None)(implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _]

Expand Down Expand Up @@ -352,7 +352,7 @@ trait KubernetesClient {
def jsonMergePatch[O <: ObjectResource](obj: O, patch: String, namespace: Option[String] = None)(implicit rd: ResourceDefinition[O], fmt: Format[O], lc: LoggingContext): Future[O]

/**
* Get the logs from a pod (similar to `kubectl logs ...`). The logs are streamed using an Akka streams source
* Get the logs from a pod (similar to `kubectl logs ...`). The logs are streamed using an Pekko streams source
* @param name the name of the pod
* @param queryParams optional parameters of the request (for example container name)
* @param namespace if set this specifies the namespace of the pod (otherwise the configured namespace is used)
Expand All @@ -365,9 +365,9 @@ trait KubernetesClient {
* @param podName the name of the pod
* @param command the command to execute
* @param maybeContainerName an optional container name
* @param maybeStdin optional Akka Source for sending input to stdin for the command
* @param maybeStdout optional Akka Sink to receive output from stdout for the command
* @param maybeStderr optional Akka Sink to receive output from stderr for the command
* @param maybeStdin optional Pekko Source for sending input to stdin for the command
* @param maybeStdout optional Pekko Sink to receive output from stdout for the command
* @param maybeStderr optional Pekko Sink to receive output from stderr for the command
* @param tty optionally set tty on
* @param maybeClose if set, this can be used to close the connection to the pod by completing the promise
* @param namespace if set this specifies the namespace of the pod (otherwise the configured namespace is used)
Expand Down
18 changes: 9 additions & 9 deletions client/src/main/scala/skuber/api/client/exec/PodExecImpl.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package skuber.api.client.exec

import akka.actor.ActorSystem
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.model.{HttpHeader, StatusCodes, Uri, ws}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.{ConnectionContext, Http}
import akka.stream.SinkShape
import akka.stream.scaladsl.{Flow, GraphDSL, Keep, Partition, Sink, Source}
import akka.util.ByteString
import akka.{Done, NotUsed}
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.model.headers.RawHeader
import org.apache.pekko.http.scaladsl.model.{HttpHeader, StatusCodes, Uri, ws}
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshal
import org.apache.pekko.http.scaladsl.{ConnectionContext, Http}
import org.apache.pekko.stream.SinkShape
import org.apache.pekko.stream.scaladsl.{Flow, GraphDSL, Keep, Partition, Sink, Source}
import org.apache.pekko.util.ByteString
import org.apache.pekko.{Done, NotUsed}
import play.api.libs.json.JsString
import skuber.api.client.impl.KubernetesClientImpl
import skuber.api.client.{K8SException, LoggingContext, Status}
Expand Down
Loading

0 comments on commit cc84c12

Please sign in to comment.