diff --git a/build.sbt b/build.sbt index 665e2d408830..c2951916e06c 100644 --- a/build.sbt +++ b/build.sbt @@ -69,6 +69,7 @@ val common = library("common") identityModel, capiAws, okhttp, + "software.amazon.awssdk" % "s3" % "2.20.96", ) ++ jackson, ) .settings( diff --git a/common/app/concurrent/FutureSemaphore.scala b/common/app/concurrent/FutureSemaphore.scala index d8db0ef11033..d2bea1c90979 100644 --- a/common/app/concurrent/FutureSemaphore.scala +++ b/common/app/concurrent/FutureSemaphore.scala @@ -16,6 +16,18 @@ class FutureSemaphore(maxOperations: Int) { Future.failed(new FutureSemaphore.TooManyOperationsInProgress()) } } + + def execute[A](task: => A)(implicit ec: ExecutionContext): A = { + if (semaphore.tryAcquire()) { + try { + task + } finally { + semaphore.release() + } + } else { + throw new FutureSemaphore.TooManyOperationsInProgress() + } + } } object FutureSemaphore { diff --git a/common/app/metrics/FrontendMetrics.scala b/common/app/metrics/FrontendMetrics.scala index ac7a660fd7a0..e5573406bffc 100644 --- a/common/app/metrics/FrontendMetrics.scala +++ b/common/app/metrics/FrontendMetrics.scala @@ -1,12 +1,12 @@ package metrics import java.util.concurrent.atomic.AtomicLong - import com.amazonaws.services.cloudwatch.model.StandardUnit import common.{Box, StopWatch} import model.diagnostics.CloudWatch import org.joda.time.DateTime -import scala.concurrent.Future + +import scala.concurrent.{ExecutionContext, Future} import scala.util.Try sealed trait DataPoint { @@ -141,6 +141,15 @@ object DurationMetric { metric.recordDuration(stopWatch.elapsed.toDouble) result } + + def withMetricsF[A](metric: DurationMetric)(block: => Future[A])(implicit ec: ExecutionContext): Future[A] = { + val stopWatch: StopWatch = new StopWatch + val result = block + result.onComplete { _ => + metric.recordDuration(stopWatch.elapsed.toDouble) + } + result + } } case class SampledDataPoint(value: Double, sampleTime: DateTime) extends DataPoint { diff --git a/common/app/services/S3.scala b/common/app/services/S3.scala index 08dfe3a034ac..cc49f4504272 100644 --- a/common/app/services/S3.scala +++ b/common/app/services/S3.scala @@ -1,18 +1,24 @@ package services -import java.io._ -import java.util.zip.{GZIPInputStream, GZIPOutputStream} - -import com.amazonaws.services.s3.{AmazonS3, AmazonS3Client} import com.amazonaws.services.s3.model.CannedAccessControlList.{Private, PublicRead} import com.amazonaws.services.s3.model._ +import com.amazonaws.services.s3.{AmazonS3, AmazonS3Client} import com.amazonaws.util.StringInputStream import common.GuLogging import conf.Configuration import model.PressedPageType import org.joda.time.DateTime +import play.api.libs.json.{JsValue, Json} +import software.amazon.awssdk.core.async.AsyncResponseTransformer +import utils.AWSv2 +import software.amazon.awssdk.{services => awssdkV2} +import java.io._ +import java.util.zip.{GZIPInputStream, GZIPOutputStream} +import scala.compat.java8.FutureConverters.CompletionStageOps +import scala.concurrent.{ExecutionContext, Future} import scala.io.{Codec, Source} +import scala.util.{Failure, Success, Using} trait S3 extends GuLogging { @@ -47,16 +53,43 @@ trait S3 extends GuLogging { log.warn("not found at %s - %s" format (bucket, key)) None case e: AmazonS3Exception => - val errorMsg = s"Unable to fetch S3 object (key: $key)" - val hintMsg = "Hint: your AWS credentials might be missing or expired. You can fetch new ones using Janus." - log.error(errorMsg, e) - println(errorMsg + " \n" + hintMsg) + logS3Exception(key, e) None case e: Exception => throw e } } + private def logS3Exception[T](key: String, e: Throwable): Unit = { + val errorMsg = s"Unable to fetch S3 object (key: $bucket $key)" + val hintMsg = "Hint: your AWS credentials might be missing or expired. You can fetch new ones using Janus." + log.error(errorMsg, e) + println(errorMsg + " \n" + hintMsg) + } + + private def withS3ResultV2[T]( + key: String, + )(action: InputStream => T)(implicit ec: ExecutionContext): Future[Option[T]] = { + val request: awssdkV2.s3.model.GetObjectRequest = + awssdkV2.s3.model.GetObjectRequest.builder().key(key).bucket(bucket).build() + + AWSv2.S3Async + .getObject( + request, + AsyncResponseTransformer.toBlockingInputStream[awssdkV2.s3.model.GetObjectResponse], + ) + .toScala + .map { responseBytes => + action(new GZIPInputStream(responseBytes)) + } + .transform { + case Success(t) => Success(Some(t)) + case Failure(e) => + logS3Exception(key, e) + Success(None) + } + } + def get(key: String)(implicit codec: Codec): Option[String] = withS3Result(key) { result => Source.fromInputStream(result.getObjectContent).mkString @@ -96,6 +129,12 @@ trait S3 extends GuLogging { Source.fromInputStream(new GZIPInputStream(result.getObjectContent)).mkString } + def getGzippedV2(key: String)(implicit codec: Codec, ec: ExecutionContext): Future[Option[JsValue]] = { + withS3ResultV2(key) { inputStream => + Using(inputStream)(Json.parse).get + } + } + private def putGzipped( key: String, value: String, diff --git a/common/app/services/fronts/FrontJsonFapi.scala b/common/app/services/fronts/FrontJsonFapi.scala index 63e2dff45483..9c2efa3bd33d 100644 --- a/common/app/services/fronts/FrontJsonFapi.scala +++ b/common/app/services/fronts/FrontJsonFapi.scala @@ -5,10 +5,10 @@ import concurrent.{BlockingOperations, FutureSemaphore} import conf.Configuration import metrics.DurationMetric import model.{PressedPage, PressedPageType} -import play.api.libs.json.Json +import play.api.libs.json.{JsValue, Json} import services.S3 -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, blocking} trait FrontJsonFapi extends GuLogging { lazy val stage: String = Configuration.facia.stage.toUpperCase @@ -28,26 +28,25 @@ trait FrontJsonFapi extends GuLogging { pressedPageFromS3(getAddressForPath(path, pageType.suffix)) } - private def parsePressedPage( - jsonStringOpt: Option[String], - )(implicit executionContext: ExecutionContext): Future[Option[PressedPage]] = - futureSemaphore.execute { - blockingOperations.executeBlocking { - jsonStringOpt.map { jsonString => - DurationMetric.withMetrics(FaciaPressMetrics.FrontDecodingLatency) { - // This operation is run in the thread pool since it is very CPU intensive - Json.parse(jsonString).as[PressedPage] - } - } + private def deserialisePressedPage( + jsValueOpt: Option[JsValue], + )(implicit ec: ExecutionContext): Future[Option[PressedPage]] = +// futureSemaphore.execute { +// blockingOperations.executeBlocking { + Future.successful(jsValueOpt.map { jsValue => + DurationMetric.withMetrics(FaciaPressMetrics.FrontDecodingLatency) { + // This operation is run in the thread pool since it is very CPU intensive + jsValue.as[PressedPage] } - } + }) +// } +// } - private def loadPressedPageFromS3(path: String) = - blockingOperations.executeBlocking { - DurationMetric.withMetrics(FaciaPressMetrics.FrontDownloadLatency) { - S3.getGzipped(path) - } + private def loadPressedPageFromS3(path: String)(implicit ec: ExecutionContext): Future[Option[JsValue]] = { + DurationMetric.withMetricsF(FaciaPressMetrics.FrontDownloadLatency) { + S3.getGzippedV2(path) } + } private def pressedPageFromS3( path: String, @@ -55,7 +54,7 @@ trait FrontJsonFapi extends GuLogging { errorLoggingF(s"FrontJsonFapi.pressedPageFromS3 $path") { for { s3FrontData <- loadPressedPageFromS3(path) - pressedPage <- parsePressedPage(s3FrontData) + pressedPage <- deserialisePressedPage(s3FrontData) } yield pressedPage } diff --git a/common/app/utils/AWSv2.scala b/common/app/utils/AWSv2.scala new file mode 100644 index 000000000000..342e602eb0b3 --- /dev/null +++ b/common/app/utils/AWSv2.scala @@ -0,0 +1,26 @@ +package utils + +import software.amazon.awssdk.auth.credentials._ +import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.regions.Region.EU_WEST_1 +import software.amazon.awssdk.services.s3.S3AsyncClient +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder + +object AWSv2 { + val region: Region = EU_WEST_1 + + def credentialsForDevAndProd(devProfile: String, prodCreds: AwsCredentialsProvider): AwsCredentialsProviderChain = + AwsCredentialsProviderChain.of(prodCreds, ProfileCredentialsProvider.builder().profileName(devProfile).build()) + + lazy val credentials: AwsCredentialsProvider = + credentialsForDevAndProd("frontend", InstanceProfileCredentialsProvider.create()) + + def build[T, B <: AwsClientBuilder[B, T]](builder: B): T = + builder.credentialsProvider(credentials).region(region).build() + + val S3Async: S3AsyncClient = build[S3AsyncClient, S3AsyncClientBuilder]( + S3AsyncClient.builder().httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(250)), + ) +}