Skip to content

Commit

Permalink
upgrade akka libs
Browse files Browse the repository at this point in the history
downgrade some akka libs because scala 2.12 is not supported on newer version

avoid using package private Akka S3Exception class

add EOL

copy old S3Exception code from Alpakka

format

scalafmt

some compile issues

Update FunctionPullingContainerProxy.scala
  • Loading branch information
pjfanning committed Sep 21, 2024
1 parent 6b1c048 commit c849318
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 16 deletions.
4 changes: 4 additions & 0 deletions LICENSE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ Spray Caching 1.3.4 (io.spray:spray-caching_2.11:1.3.4 - http://spray.io/documen
License included at licenses/LICENSE-spray.txt, or https://github.com/spray/spray/blob/master/LICENSE
Copyright (C) 2011-2015 the spray project <http://spray.io>

common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3Exception.scala
is based on https://github.com/akka/alpakka/blob/v1.0.2/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala
Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>

This product bundles the files gradlew and gradlew.bat from Gradle v5.5
which are distributed under the Apache License, Version 2.0.
For details see ./gradlew and ./gradlew.bat.
6 changes: 3 additions & 3 deletions common/scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ dependencies {
exclude group: 'org.scala-lang', module: 'scala-compiler'
exclude group: 'org.scala-lang', module: 'scala-reflect'
}
api "io.spray:spray-json_${gradle.scala.depVersion}:1.3.5"
api "io.spray:spray-json_${gradle.scala.depVersion}:1.3.6"
api "com.lihaoyi:fastparse_${gradle.scala.depVersion}:2.3.0"
api "com.typesafe.akka:akka-actor_${gradle.scala.depVersion}:${gradle.akka.version}"
api "com.typesafe.akka:akka-stream_${gradle.scala.depVersion}:${gradle.akka.version}"
Expand All @@ -53,7 +53,7 @@ dependencies {
api "com.typesafe.akka:akka-http-core_${gradle.scala.depVersion}:${gradle.akka_http.version}"
api "com.typesafe.akka:akka-http-spray-json_${gradle.scala.depVersion}:${gradle.akka_http.version}"

api "com.lightbend.akka:akka-stream-alpakka-file_${gradle.scala.depVersion}:1.1.2"
api "com.lightbend.akka:akka-stream-alpakka-file_${gradle.scala.depVersion}:3.0.4"

api "ch.qos.logback:logback-classic:1.2.11"
api "org.slf4j:jcl-over-slf4j:1.7.25"
Expand Down Expand Up @@ -95,7 +95,7 @@ dependencies {
api "io.reactivex:rxjava-reactive-streams:1.2.1"


api ("com.lightbend.akka:akka-stream-alpakka-s3_${gradle.scala.depVersion}:1.1.2") {
api ("com.lightbend.akka:akka-stream-alpakka-s3_${gradle.scala.depVersion}:3.0.4") {
exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'com.fasterxml.jackson.dataformat'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ object OwSink {
*/
def combine[T, U, M1, M2](first: Sink[U, M1], second: Sink[U, M2])(
strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, (M1, M2)] = {
Sink.fromGraph(GraphDSL.create(first, second)((_, _)) { implicit b => (s1, s2) =>
Sink.fromGraph(GraphDSL.createGraph(first, second)((_, _)) { implicit b => (s1, s2) =>
import GraphDSL.Implicits._
val d = b.add(strategy(2))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model.headers.Authorization
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.OverflowStrategy
import akka.stream.QueueOfferResult
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
Expand Down Expand Up @@ -73,7 +72,7 @@ case class SplunkLogStoreConfig(host: String,
disableSNI: Boolean)
case class SplunkResponse(results: Vector[JsObject])
object SplunkResponseJsonProtocol extends DefaultJsonProtocol {
implicit val orderFormat = jsonFormat1(SplunkResponse)
implicit val orderFormat: RootJsonFormat[SplunkResponse] = jsonFormat1(SplunkResponse)
}

/**
Expand Down Expand Up @@ -180,7 +179,7 @@ class SplunkLogStore(
//based on http://doc.akka.io/docs/akka-http/10.0.6/scala/http/client-side/host-level.html
val queue =
Source
.queue[(HttpRequest, Promise[HttpResponse])](maxPendingRequests, OverflowStrategy.dropNew)
.queue[(HttpRequest, Promise[HttpResponse])](maxPendingRequests)
.via(httpFlow.getOrElse(defaultHttpFlow))
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
Expand All @@ -190,7 +189,7 @@ class SplunkLogStore(

def queueRequest(request: HttpRequest): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
queue.offer(request -> responsePromise).flatMap {
queue.offer(request -> responsePromise) match {
case QueueOfferResult.Enqueued => responsePromise.future
case QueueOfferResult.Dropped =>
Future.failed(new RuntimeException("Splunk API Client Queue overflowed. Try again later."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.openwhisk.core.database
import akka.Done
import akka.actor.ActorSystem

import scala.annotation.nowarn
import scala.collection.immutable.Queue
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -59,6 +60,7 @@ class Batcher[T, R](batchSize: Int, concurrency: Int, retry: Int)(operation: (Se
CompletionStrategy.immediately
}

@nowarn("msg=deprecated")
private val stream = Source
.actorRef[(T, Promise[R])](
completionMatcher = cm,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[database] object StoreUtils {

def combinedSink[T](dest: Sink[ByteString, Future[T]])(
implicit ec: ExecutionContext): Sink[ByteString, Future[AttachmentUploadResult[T]]] = {
Sink.fromGraph(GraphDSL.create(digestSink(), lengthSink(), dest)(combineResult) {
Sink.fromGraph(GraphDSL.createGraph(digestSink(), lengthSink(), dest)(combineResult) {
implicit builder => (dgs, ls, dests) =>
import GraphDSL.Implicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import akka.http.scaladsl.model.{ContentType, HttpRequest, HttpResponse, Uri}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.alpakka.s3.headers.CannedAcl
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.alpakka.s3.{S3Attributes, S3Exception, S3Headers, S3Settings}
import akka.stream.alpakka.s3.{S3Attributes, S3Headers, S3Settings}
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import com.typesafe.config.Config
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package org.apache.openwhisk.core.database.s3

import scala.util.Try
import scala.xml.{Elem, XML}

/**
* Exception thrown by S3 operations.
*
* Copied from https://github.com/akka/alpakka/blob/v1.0.2/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala
*/
private[s3] class S3Exception(val code: String, val message: String, val requestId: String, val hostId: String)
extends RuntimeException(message) {

def this(xmlResponse: Elem) =
this(
(xmlResponse \ "Code").text,
(xmlResponse \ "Message").text,
(xmlResponse \ "RequestID").text,
(xmlResponse \ "HostID").text)

def this(response: String) =
this(
Try(XML.loadString(response)).getOrElse(
<Error><Code>-</Code><Message>{response}</Message><RequestID>-</RequestID><HostID>-</HostID></Error>))

override def toString: String =
s"${super.toString} (Code: $code, RequestID: $requestId, HostID: $hostId)"

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.openwhisk.core.containerpool.v2
import java.net.InetSocketAddress
import java.time.Instant
import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{actorRef2Scala, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
import akka.event.Logging.InfoLevel
import akka.io.{IO, Tcp}
import akka.pattern.pipe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ object Invoker {
}

// load values for the required properties from the environment
implicit val config = new WhiskConfig(requiredProperties, optionalProperties)
implicit val config: WhiskConfig = new WhiskConfig(requiredProperties, optionalProperties)

def abort(message: String) = {
logger.error(this, message)(TransactionId.invoker)
Expand Down
8 changes: 4 additions & 4 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ gradle.ext.scalafmt = [
config: new File(rootProject.projectDir, '.scalafmt.conf')
]

gradle.ext.akka = [version : '2.6.12']
gradle.ext.akka_kafka = [version : '2.0.5']
gradle.ext.akka_http = [version : '10.2.4']
gradle.ext.akka_management = [version : '1.0.5']
gradle.ext.akka = [version : '2.6.21']
gradle.ext.akka_kafka = [version : '2.1.1']
gradle.ext.akka_http = [version : '10.2.10']
gradle.ext.akka_management = [version : '1.1.4']

gradle.ext.curator = [version : '4.3.0']
gradle.ext.kube_client = [version: '4.10.3']
Expand Down

0 comments on commit c849318

Please sign in to comment.