Skip to content

Commit

Permalink
#760 Migrate from Akka to Pekko (#762)
Browse files Browse the repository at this point in the history
* first cut changes

* little more progress

* few more fixes

* compiles

* compile with coverage works

* heikoseebergerAkkaHttpJsonVersion => pjfanningAkkaHttpJsonV

* fixed JacksonMapperSupport issue

* first sweep of tests working

* squbs-actormonitor tests successful

* squbs-ext tests

* squbs-httpclient tests

* testng issue

* bumped version of squbs and github actions

* version bump to 1.0.2-SNAPSHOT

* sync pull-request.yml with publish.yml
  • Loading branch information
bharath12345 authored Apr 17, 2024
1 parent 4899bdd commit 2569bf5
Show file tree
Hide file tree
Showing 365 changed files with 1,860 additions and 1,849 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ jobs:
SCALA_VERSION: ${{ matrix.scala-version }}
steps:
- name: Checkout
uses: actions/checkout@v2
uses: actions/checkout@v4
- name: Setup JDK
uses: olafurpg/setup-scala@v10
uses: olafurpg/setup-scala@v11
with:
java-version: [email protected]
- name: Import GPG key
id: import_gpg
uses: crazy-max/ghaction-import-gpg@v4
uses: crazy-max/ghaction-import-gpg@v6
with:
gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }}
passphrase: ${{ secrets.PASSPHRASE }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ jobs:
SCALA_VERSION: ${{ matrix.scala-version }}
steps:
- name: Checkout
uses: actions/checkout@v2
uses: actions/checkout@v4
- name: Setup JDK
uses: olafurpg/setup-scala@v10
uses: olafurpg/setup-scala@v11
with:
java-version: [email protected]
- run: sbt ++$SCALA_VERSION clean coverage test
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.squbs/squbs-unicomplex_2.12/badge.svg?style=flat)](http://search.maven.org/#search|ga|1|g:org.squbs)
[![License](https://img.shields.io/badge/License-Apache%202.0-red.svg)](https://opensource.org/licenses/Apache-2.0)

squbs (pronounced "skewbs") is a suite of components enabling standardization and operationalization of Akka and Akka HTTP applications/services in a large scale, managed, cloud environment. It standardizes how Akka applications are deployed in different environments and how they are hooked up to the operational environments of large, internet-scale organizations.
squbs (pronounced "skewbs") is a suite of components enabling standardization and operationalization of Pekko and Pekko HTTP applications/services in a large scale, managed, cloud environment. It standardizes how Pekko applications are deployed in different environments and how they are hooked up to the operational environments of large, internet-scale organizations.

## Documentation

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ lazy val `squbs-pipeline` = project

lazy val `squbs-unicomplex` = project dependsOn (`squbs-pipeline`, `squbs-ext`)

lazy val `squbs-testkit` = (project dependsOn `squbs-unicomplex`).enablePlugins(de.johoop.testngplugin.TestNGPlugin)
lazy val `squbs-testkit` = (project dependsOn `squbs-unicomplex`)//.enablePlugins(de.johoop.testngplugin.TestNGPlugin)

lazy val `squbs-zkcluster` = project dependsOn `squbs-testkit` % Test

Expand Down
2 changes: 1 addition & 1 deletion docs/actor-hierarchy.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

squbs sets up the actor and component hierarchy shown in the above picture to support a modular runtime for actors and services running in a squbs system.

* **ActorSystem** - This is the akka ActorSystem. A squbs system uses one single actor system to support all services and cubes. This will ensure that we have a single control point for dispatchers running in a squbs system. The ActorSystem name is "squbs" by default but can be overridden by overriding the settings in application.conf.
* **ActorSystem** - This is the pekko ActorSystem. A squbs system uses one single actor system to support all services and cubes. This will ensure that we have a single control point for dispatchers running in a squbs system. The ActorSystem name is "squbs" by default but can be overridden by overriding the settings in application.conf.

* **Unicomplex** - This is the core singleton actor that manages the squbs system. It registers all cubes and communicates with the web-service actor and the cube supervisors for lifecycle management of the system. It is also responsible for starting the web-service and service-registrar actors. Applications or system components can access the `ActorRef` of `Unicomplex` by calling `Unicomplex()`

Expand Down
2 changes: 1 addition & 1 deletion docs/blocking-dispatcher.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

# The Blocking Dispatcher for Blocking API Calls

This topic is not about dispatchers in general, but about squbs-specific dispatcher configurations. Please check the [Akka documentation](http://doc.akka.io/docs/akka/2.3.13/scala/dispatchers.html) for descriptions and details of dispatchers.
This topic is not about dispatchers in general, but about squbs-specific dispatcher configurations. Please check the [Pekko documentation](http://doc.pekko.io/docs/pekko/2.3.13/scala/dispatchers.html) for descriptions and details of dispatchers.

squbs adds another pre-configured dispatcher for use in blocking calls. Generally, these are used for synchronous calls to the database. The reference.conf defines the blocking-dispatcher as follows:

Expand Down
10 changes: 5 additions & 5 deletions docs/bootstrap.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ Providing `application.conf` for a cube may cause issues when multiple cubes try

## Well Known Actors

Well known actors are just [Akka actors](http://doc.akka.io/docs/akka/2.3.13/scala/actors.html) as defined by the
[Akka documentation](http://doc.akka.io/docs/akka/2.3.13/scala.html). They are started by a supervisor actor that is created for each cube. The supervisor carries the name of the cube. Therefore any well known actor has a path of
Well known actors are just [pekko actors](http://doc.pekko.io/docs/pekko/2.3.13/scala/actors.html) as defined by the
[pekko documentation](http://doc.pekko.io/docs/pekko/2.3.13/scala.html). They are started by a supervisor actor that is created for each cube. The supervisor carries the name of the cube. Therefore any well known actor has a path of
/<CubeName>/<ActorName> and can be looked up using the ActorSelection call under /user/<CubeName>/<ActorName>.

A well known actor can be started as a singleton actor or with a router. To declare a well known actor as a router,
add:
with-router = true
in the actor declaration. Router, dispatcher, and mailbox configuration for well known actors are done in
reference.conf or application.conf following the Akka documentation.
reference.conf or application.conf following the pekko documentation.

Following is a sample cube declaration META-INF/squbs-meta.conf declaring a well known actor:

Expand All @@ -130,7 +130,7 @@ affect the routee, you need to create a separate configuration for the routees a
configure the dispatcher in the routee section as the following example.

```
akka.actor.deployment {
pekko.actor.deployment {
# Router configuration
/bottlecube/lyrics {
Expand All @@ -149,7 +149,7 @@ akka.actor.deployment {
```

Router concepts, examples, and configuration, are documented in the
[Akka documentation](http://doc.akka.io/docs/akka/2.3.13/scala/routing.html).
[pekko documentation](http://doc.pekko.io/docs/pekko/2.3.13/scala/routing.html).

## Services

Expand Down
6 changes: 3 additions & 3 deletions docs/circuitbreaker.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

### Overview

Akka Streams and Akka HTTP are great technologies to build highly resilient systems. They provide back-pressure to ensure you do not overload the system and the slowness of one component does not cause its work queue to pile up and end up in memory leaks. But, we need another safeguard to ensure our service stays responsive in the event of external or internal failures, and have alternate paths to satisfying the requests/messages due to such failures. If a downstream service is not responding or slow, we could alternatively try another service or fetch cached results instead of back-pressuring the stream and potentially the whole system.
Pekko Streams and Pekko HTTP are great technologies to build highly resilient systems. They provide back-pressure to ensure you do not overload the system and the slowness of one component does not cause its work queue to pile up and end up in memory leaks. But, we need another safeguard to ensure our service stays responsive in the event of external or internal failures, and have alternate paths to satisfying the requests/messages due to such failures. If a downstream service is not responding or slow, we could alternatively try another service or fetch cached results instead of back-pressuring the stream and potentially the whole system.

squbs introduces `CircuitBreaker` Akka Streams `GraphStage` to provide circuit breaker functionality for streams.
squbs introduces `CircuitBreaker` Pekko Streams `GraphStage` to provide circuit breaker functionality for streams.

### Dependency

Expand Down Expand Up @@ -121,7 +121,7 @@ CircuitBreakerSettings settings =

#### Failure Decider

By default, any `Failure` from the joined `Flow` is considered a problem and causes the circuit breaker failure count to be incremented. However, `CircuitBreakerSettings` also accepts an optional `failureDecider` to decide on whether an element passed by the joined `Flow` is actually considered a failure. For instance, if Circuit Breaker is joined with an Akka HTTP flow, a `Success` Http Response with status code 500 internal server error should be considered a failure.
By default, any `Failure` from the joined `Flow` is considered a problem and causes the circuit breaker failure count to be incremented. However, `CircuitBreakerSettings` also accepts an optional `failureDecider` to decide on whether an element passed by the joined `Flow` is actually considered a failure. For instance, if Circuit Breaker is joined with an Pekko HTTP flow, a `Success` Http Response with status code 500 internal server error should be considered a failure.

##### Scala

Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ blocking-dispatcher {

## Blocking Dispatcher

The squbs `reference.conf` declares a `blocking-dispatcher` used for blocking I/O calls. This is a standard Akka dispatcher configuration. Please see [dispatchers](http://doc.akka.io/docs/akka/2.3.13/scala/dispatchers.html) in the Akka documentation for more detail.
The squbs `reference.conf` declares a `blocking-dispatcher` used for blocking I/O calls. This is a standard pekko dispatcher configuration. Please see [dispatchers](http://doc.pekko.io/docs/pekko/2.3.13/scala/dispatchers.html) in the pekko documentation for more detail.

## Listeners

Expand Down
2 changes: 1 addition & 1 deletion docs/console.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ JSON plugins for your browser would detect and allow easy clicking on the links.
{
"fullName" : "org.squbs.admin",
"name" : "admin",
"supervisor" : "Actor[akka://squbs/user/admin#104594558]",
"supervisor" : "Actor[pekko://squbs/user/admin#104594558]",
"version" : "0.7.1"
}
]
Expand Down
4 changes: 2 additions & 2 deletions docs/customizing-stream-control.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Customizing Stream Control

Akka Streams/Reactive stream needs to be integrated with the [Runtime Lifecycle](lifecycle.md) of the server. For this, an automated or semi-automated integration is provided through the [`PerpetualStream`](perpetualstream.md) infrastructure. If you need even more fine-grained control over stream, the following sections explain such facilities.
Pekko Streams/Reactive stream needs to be integrated with the [Runtime Lifecycle](lifecycle.md) of the server. For this, an automated or semi-automated integration is provided through the [`PerpetualStream`](perpetualstream.md) infrastructure. If you need even more fine-grained control over stream, the following sections explain such facilities.

### Dependency

Expand Down Expand Up @@ -31,7 +31,7 @@ final Source aggregatedSource = new LifecycleManaged().source(inSource);

In the Scala API, the resulting source will be an aggregated source materialize to a `(M, () => ActorRef)` where `M` is the materialized type of `inSource` and `() => ActorRef` is the materialized type of the function for accessing the trigger actor which receives events from the Unicomplex, the squbs container.

In the Java API, the resulting source will be an aggregated source materialize to a `akka.japi.Pair<M, Supplier<ActorRef>>` where `M` is the materialized type of `inSource` and `Supplier<ActorRef>` is the materialized type of the function for accessing the trigger actor. Calling the `get()` method on the `Supplier` allows access to the `ActorRef`. This `ActorRef` receives events from the Unicomplex, the squbs container.
In the Java API, the resulting source will be an aggregated source materialize to a `org.apache.pekko.japi.Pair<M, Supplier<ActorRef>>` where `M` is the materialized type of `inSource` and `Supplier<ActorRef>` is the materialized type of the function for accessing the trigger actor. Calling the `get()` method on the `Supplier` allows access to the `ActorRef`. This `ActorRef` receives events from the Unicomplex, the squbs container.

The aggregated source does not emit from original source until lifecycle becomes `Active`, and stop emitting element and shuts down the stream after lifecycle state becomes `Stopping`.

Expand Down
4 changes: 2 additions & 2 deletions docs/deduplicate.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

### Overview

`Deduplicate` is an Akka Streams `GraphStage` to drop identical (consecutive or non-consecutive) elements in a stream.
`Deduplicate` is an Pekko Streams `GraphStage` to drop identical (consecutive or non-consecutive) elements in a stream.

### Dependency

Expand All @@ -14,7 +14,7 @@ Add the following dependency to your `build.sbt` or scala build file:

### Usage

The usage is very similar to standard Akka Stream stages:
The usage is very similar to standard Pekko Stream stages:

```scala
val result = Source("a" :: "b" :: "b" :: "c" :: "a" :: "a" :: "a" :: "c" :: Nil).
Expand Down
8 changes: 4 additions & 4 deletions docs/flow-retry.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

### Overview

Some stream use cases may require retrying of requests after a failure response. squbs provides a `Retry` Akka Streams
Some stream use cases may require retrying of requests after a failure response. squbs provides a `Retry` pekko Streams
stage to add a retry capability to streams that need to add retries for any failing requests.

### Dependency
Expand Down Expand Up @@ -259,9 +259,9 @@ final BidiFlow<Pair<String, Context>,
##### Configuring the threshold for backpressure

If the joined flow keeps returning failures, `Retry` starts back pressuring when the elements waiting to be retried
reaches to a certain threshold. By default, the threshold is equal to the internal buffer size of `Retry` Akka Stream
`GraphStage` (please see [Akka Stream
Attributes](https://doc.akka.io/docs/akka/current/stream/stream-composition.html#attributes)). The threshold can be
reaches to a certain threshold. By default, the threshold is equal to the internal buffer size of `Retry` pekko Stream
`GraphStage` (please see [pekko Stream
Attributes](https://doc.pekko.io/docs/pekko/current/stream/stream-composition.html#attributes)). The threshold can be
made independent of internal buffer size by calling `withMaxWaitingRetries`:


Expand Down
8 changes: 4 additions & 4 deletions docs/flow-timeout.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

### Overview

Some stream use cases may require each message in a flow to be processed within a bounded time or to send a timeout failure message instead. squbs introduces `Timeout` Akka Streams stage to add timeout functionality to streams.
Some stream use cases may require each message in a flow to be processed within a bounded time or to send a timeout failure message instead. squbs introduces `Timeout` pekko Streams stage to add timeout functionality to streams.

### Dependency

Expand Down Expand Up @@ -90,7 +90,7 @@ public class Timeout {
}
```

This `BidiFlow` can be joined with any flow that takes in a `akka.japi.Pair<In, Context>` and outputs a `akka.japi.Pair<Out, Context>`.
This `BidiFlow` can be joined with any flow that takes in a `org.apache.pekko.japi.Pair<In, Context>` and outputs a `org.apache.pekko.japi.Pair<Out, Context>`.

```java
final Duration duration = Duration.ofSeconds(1);
Expand Down Expand Up @@ -191,11 +191,11 @@ Source.from(Arrays.asList("a", "b", "c"))

The `Timeout` also provides a clean up callback function to be passed in via `TimeoutSettings`. This function will be called for emitted elements that were already considered timed out.

An example use case for this functionality is when `Timeout` is used with Akka Http client. As described in [Implications of the streaming nature of Request/Response Entities](http://doc.akka.io/docs/akka-http/current/scala/http/implications-of-streaming-http-entity.html), all http responses must be consumed or discarded. By passing a clean up callback to discard the timed out requests when they complete we avoid clogging down the stream.
An example use case for this functionality is when `Timeout` is used with pekko Http client. As described in [Implications of the streaming nature of Request/Response Entities](http://doc.pekko.io/docs/pekko-http/current/scala/http/implications-of-streaming-http-entity.html), all http responses must be consumed or discarded. By passing a clean up callback to discard the timed out requests when they complete we avoid clogging down the stream.

###### Scala
```scala
val akkaHttpDiscard = (response: HttpResponse) => response.discardEntityBytes()
val pekkoHttpDiscard = (response: HttpResponse) => response.discardEntityBytes()

val settings =
TimeoutSettings[HttpRequest, HttpResponse, Context](1.second)
Expand Down
Loading

0 comments on commit 2569bf5

Please sign in to comment.