From c849318a8e0684c87ec73fc8ab5745d6bc9303de Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 27 May 2024 10:28:20 +0100 Subject: [PATCH] upgrade akka libs downgrade some akka libs because scala 2.12 is not supported on newer version avoid using package private Akka S3Exception class add EOL copy old S3Exception code from Alpakka format scalafmt some compile issues Update FunctionPullingContainerProxy.scala --- LICENSE.txt | 4 ++ common/scala/build.gradle | 6 +-- .../DockerToActivationFileLogStore.scala | 2 +- .../logging/SplunkLogStore.scala | 7 ++- .../openwhisk/core/database/Batcher.scala | 2 + .../openwhisk/core/database/StoreUtils.scala | 2 +- .../core/database/s3/S3AttachmentStore.scala | 2 +- .../core/database/s3/S3Exception.scala | 50 +++++++++++++++++++ .../v2/FunctionPullingContainerProxy.scala | 2 +- .../openwhisk/core/invoker/Invoker.scala | 2 +- settings.gradle | 8 +-- 11 files changed, 71 insertions(+), 16 deletions(-) create mode 100644 common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3Exception.scala diff --git a/LICENSE.txt b/LICENSE.txt index 933501710f8..1781c2d8021 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -213,6 +213,10 @@ Spray Caching 1.3.4 (io.spray:spray-caching_2.11:1.3.4 - http://spray.io/documen License included at licenses/LICENSE-spray.txt, or https://github.com/spray/spray/blob/master/LICENSE Copyright (C) 2011-2015 the spray project +common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3Exception.scala +is based on https://github.com/akka/alpakka/blob/v1.0.2/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala + Copyright (C) 2016-2019 Lightbend Inc. + This product bundles the files gradlew and gradlew.bat from Gradle v5.5 which are distributed under the Apache License, Version 2.0. For details see ./gradlew and ./gradlew.bat. diff --git a/common/scala/build.gradle b/common/scala/build.gradle index 3c165e635d7..0745a56a11e 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -40,7 +40,7 @@ dependencies { exclude group: 'org.scala-lang', module: 'scala-compiler' exclude group: 'org.scala-lang', module: 'scala-reflect' } - api "io.spray:spray-json_${gradle.scala.depVersion}:1.3.5" + api "io.spray:spray-json_${gradle.scala.depVersion}:1.3.6" api "com.lihaoyi:fastparse_${gradle.scala.depVersion}:2.3.0" api "com.typesafe.akka:akka-actor_${gradle.scala.depVersion}:${gradle.akka.version}" api "com.typesafe.akka:akka-stream_${gradle.scala.depVersion}:${gradle.akka.version}" @@ -53,7 +53,7 @@ dependencies { api "com.typesafe.akka:akka-http-core_${gradle.scala.depVersion}:${gradle.akka_http.version}" api "com.typesafe.akka:akka-http-spray-json_${gradle.scala.depVersion}:${gradle.akka_http.version}" - api "com.lightbend.akka:akka-stream-alpakka-file_${gradle.scala.depVersion}:1.1.2" + api "com.lightbend.akka:akka-stream-alpakka-file_${gradle.scala.depVersion}:3.0.4" api "ch.qos.logback:logback-classic:1.2.11" api "org.slf4j:jcl-over-slf4j:1.7.25" @@ -95,7 +95,7 @@ dependencies { api "io.reactivex:rxjava-reactive-streams:1.2.1" - api ("com.lightbend.akka:akka-stream-alpakka-s3_${gradle.scala.depVersion}:1.1.2") { + api ("com.lightbend.akka:akka-stream-alpakka-s3_${gradle.scala.depVersion}:3.0.4") { exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http exclude group: 'com.fasterxml.jackson.core' exclude group: 'com.fasterxml.jackson.dataformat' diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala index 83267ba3e72..6680fae1d05 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala @@ -174,7 +174,7 @@ object OwSink { */ def combine[T, U, M1, M2](first: Sink[U, M1], second: Sink[U, M2])( strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, (M1, M2)] = { - Sink.fromGraph(GraphDSL.create(first, second)((_, _)) { implicit b => (s1, s2) => + Sink.fromGraph(GraphDSL.createGraph(first, second)((_, _)) { implicit b => (s1, s2) => import GraphDSL.Implicits._ val d = b.add(strategy(2)) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStore.scala index a65fe205087..ca5ac256a03 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStore.scala @@ -33,7 +33,6 @@ import akka.http.scaladsl.model.Uri.Path import akka.http.scaladsl.model.headers.Authorization import akka.http.scaladsl.model.headers.BasicHttpCredentials import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream.OverflowStrategy import akka.stream.QueueOfferResult import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Keep @@ -73,7 +72,7 @@ case class SplunkLogStoreConfig(host: String, disableSNI: Boolean) case class SplunkResponse(results: Vector[JsObject]) object SplunkResponseJsonProtocol extends DefaultJsonProtocol { - implicit val orderFormat = jsonFormat1(SplunkResponse) + implicit val orderFormat: RootJsonFormat[SplunkResponse] = jsonFormat1(SplunkResponse) } /** @@ -180,7 +179,7 @@ class SplunkLogStore( //based on http://doc.akka.io/docs/akka-http/10.0.6/scala/http/client-side/host-level.html val queue = Source - .queue[(HttpRequest, Promise[HttpResponse])](maxPendingRequests, OverflowStrategy.dropNew) + .queue[(HttpRequest, Promise[HttpResponse])](maxPendingRequests) .via(httpFlow.getOrElse(defaultHttpFlow)) .toMat(Sink.foreach({ case ((Success(resp), p)) => p.success(resp) @@ -190,7 +189,7 @@ class SplunkLogStore( def queueRequest(request: HttpRequest): Future[HttpResponse] = { val responsePromise = Promise[HttpResponse]() - queue.offer(request -> responsePromise).flatMap { + queue.offer(request -> responsePromise) match { case QueueOfferResult.Enqueued => responsePromise.future case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Splunk API Client Queue overflowed. Try again later.")) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/Batcher.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/Batcher.scala index 4f72fe226e2..b2adef5fb2d 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/Batcher.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/Batcher.scala @@ -20,6 +20,7 @@ package org.apache.openwhisk.core.database import akka.Done import akka.actor.ActorSystem +import scala.annotation.nowarn import scala.collection.immutable.Queue import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success} @@ -59,6 +60,7 @@ class Batcher[T, R](batchSize: Int, concurrency: Int, retry: Int)(operation: (Se CompletionStrategy.immediately } + @nowarn("msg=deprecated") private val stream = Source .actorRef[(T, Promise[R])]( completionMatcher = cm, diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/StoreUtils.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/StoreUtils.scala index 6a0c5df5770..cb4f7eff335 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/StoreUtils.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/StoreUtils.scala @@ -84,7 +84,7 @@ private[database] object StoreUtils { def combinedSink[T](dest: Sink[ByteString, Future[T]])( implicit ec: ExecutionContext): Sink[ByteString, Future[AttachmentUploadResult[T]]] = { - Sink.fromGraph(GraphDSL.create(digestSink(), lengthSink(), dest)(combineResult) { + Sink.fromGraph(GraphDSL.createGraph(digestSink(), lengthSink(), dest)(combineResult) { implicit builder => (dgs, ls, dests) => import GraphDSL.Implicits._ diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala index 10c65ea967a..8311a54c0f8 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala @@ -26,7 +26,7 @@ import akka.http.scaladsl.model.{ContentType, HttpRequest, HttpResponse, Uri} import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.alpakka.s3.headers.CannedAcl import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.alpakka.s3.{S3Attributes, S3Exception, S3Headers, S3Settings} +import akka.stream.alpakka.s3.{S3Attributes, S3Headers, S3Settings} import akka.stream.scaladsl.{Sink, Source} import akka.util.ByteString import com.typesafe.config.Config diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3Exception.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3Exception.scala new file mode 100644 index 00000000000..4112e579dd5 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3Exception.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright (C) 2016-2019 Lightbend Inc. + */ + +package org.apache.openwhisk.core.database.s3 + +import scala.util.Try +import scala.xml.{Elem, XML} + +/** + * Exception thrown by S3 operations. + * + * Copied from https://github.com/akka/alpakka/blob/v1.0.2/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala + */ +private[s3] class S3Exception(val code: String, val message: String, val requestId: String, val hostId: String) + extends RuntimeException(message) { + + def this(xmlResponse: Elem) = + this( + (xmlResponse \ "Code").text, + (xmlResponse \ "Message").text, + (xmlResponse \ "RequestID").text, + (xmlResponse \ "HostID").text) + + def this(response: String) = + this( + Try(XML.loadString(response)).getOrElse( + -{response}--)) + + override def toString: String = + s"${super.toString} (Code: $code, RequestID: $requestId, HostID: $hostId)" + +} diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala index b0fa73f35f3..ce08ee78e7d 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala @@ -20,7 +20,7 @@ package org.apache.openwhisk.core.containerpool.v2 import java.net.InetSocketAddress import java.time.Instant import akka.actor.Status.{Failure => FailureMessage} -import akka.actor.{actorRef2Scala, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash} +import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash} import akka.event.Logging.InfoLevel import akka.io.{IO, Tcp} import akka.pattern.pipe diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala index 1eeb65c16a8..bf899ef5330 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala @@ -130,7 +130,7 @@ object Invoker { } // load values for the required properties from the environment - implicit val config = new WhiskConfig(requiredProperties, optionalProperties) + implicit val config: WhiskConfig = new WhiskConfig(requiredProperties, optionalProperties) def abort(message: String) = { logger.error(this, message)(TransactionId.invoker) diff --git a/settings.gradle b/settings.gradle index d7e9f361299..da6af71c93a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -99,10 +99,10 @@ gradle.ext.scalafmt = [ config: new File(rootProject.projectDir, '.scalafmt.conf') ] -gradle.ext.akka = [version : '2.6.12'] -gradle.ext.akka_kafka = [version : '2.0.5'] -gradle.ext.akka_http = [version : '10.2.4'] -gradle.ext.akka_management = [version : '1.0.5'] +gradle.ext.akka = [version : '2.6.21'] +gradle.ext.akka_kafka = [version : '2.1.1'] +gradle.ext.akka_http = [version : '10.2.10'] +gradle.ext.akka_management = [version : '1.1.4'] gradle.ext.curator = [version : '4.3.0'] gradle.ext.kube_client = [version: '4.10.3']