Skip to content

Commit

Permalink
Switch to async AWS SDK v2 for Facia JSON download
Browse files Browse the repository at this point in the history
Context: #26335

In this change:

* Use AWS SDK v2 and non-blocking async code to avoid blocking a thread
  while downloading the Facia JSON data from S3
* Directly transform bytes into JsValue, without constructing a `String`
  first

Note that the work done between the two metrics `FrontDownloadLatency`
& `FrontDecodingLatency` has changed slightly - the conversion to the
basic JSON model (JsValue) now occurs in `FrontDownloadLatency`, rather
than the 'decoding' step.

We could get rid of the futureSemaphore and stop using the dedicated
blocking-operations pool here, but we'll leave that to another PR.

Co-authored-by: Ravi <[email protected]>
Co-authored-by: Ioanna Kokkini <[email protected]>
  • Loading branch information
3 people authored and rtyley committed Aug 2, 2023
1 parent 34d8b3a commit 47a5501
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 30 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ val common = library("common")
identityModel,
capiAws,
okhttp,
"software.amazon.awssdk" % "s3" % "2.20.96",
) ++ jackson,
)
.settings(
Expand Down
12 changes: 12 additions & 0 deletions common/app/concurrent/FutureSemaphore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ class FutureSemaphore(maxOperations: Int) {
Future.failed(new FutureSemaphore.TooManyOperationsInProgress())
}
}

def execute[A](task: => A)(implicit ec: ExecutionContext): A = {
if (semaphore.tryAcquire()) {
try {
task
} finally {
semaphore.release()
}
} else {
throw new FutureSemaphore.TooManyOperationsInProgress()
}
}
}

object FutureSemaphore {
Expand Down
13 changes: 11 additions & 2 deletions common/app/metrics/FrontendMetrics.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package metrics

import java.util.concurrent.atomic.AtomicLong

import com.amazonaws.services.cloudwatch.model.StandardUnit
import common.{Box, StopWatch}
import model.diagnostics.CloudWatch
import org.joda.time.DateTime
import scala.concurrent.Future

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

sealed trait DataPoint {
Expand Down Expand Up @@ -141,6 +141,15 @@ object DurationMetric {
metric.recordDuration(stopWatch.elapsed.toDouble)
result
}

def withMetricsF[A](metric: DurationMetric)(block: => Future[A])(implicit ec: ExecutionContext): Future[A] = {
val stopWatch: StopWatch = new StopWatch
val result = block
result.onComplete { _ =>
metric.recordDuration(stopWatch.elapsed.toDouble)
}
result
}
}

case class SampledDataPoint(value: Double, sampleTime: DateTime) extends DataPoint {
Expand Down
55 changes: 47 additions & 8 deletions common/app/services/S3.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package services

import java.io._
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import com.amazonaws.services.s3.{AmazonS3, AmazonS3Client}
import com.amazonaws.services.s3.model.CannedAccessControlList.{Private, PublicRead}
import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.{AmazonS3, AmazonS3Client}
import com.amazonaws.util.StringInputStream
import common.GuLogging
import conf.Configuration
import model.PressedPageType
import org.joda.time.DateTime
import play.api.libs.json.{JsValue, Json}
import software.amazon.awssdk.core.async.AsyncResponseTransformer
import utils.AWSv2
import software.amazon.awssdk.{services => awssdkV2}

import java.io._
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.compat.java8.FutureConverters.CompletionStageOps
import scala.concurrent.{ExecutionContext, Future}
import scala.io.{Codec, Source}
import scala.util.{Failure, Success, Using}

trait S3 extends GuLogging {

Expand Down Expand Up @@ -47,16 +53,43 @@ trait S3 extends GuLogging {
log.warn("not found at %s - %s" format (bucket, key))
None
case e: AmazonS3Exception =>
val errorMsg = s"Unable to fetch S3 object (key: $key)"
val hintMsg = "Hint: your AWS credentials might be missing or expired. You can fetch new ones using Janus."
log.error(errorMsg, e)
println(errorMsg + " \n" + hintMsg)
logS3Exception(key, e)
None
case e: Exception =>
throw e
}
}

private def logS3Exception[T](key: String, e: Throwable): Unit = {
val errorMsg = s"Unable to fetch S3 object (key: $bucket $key)"
val hintMsg = "Hint: your AWS credentials might be missing or expired. You can fetch new ones using Janus."
log.error(errorMsg, e)
println(errorMsg + " \n" + hintMsg)
}

private def withS3ResultV2[T](
key: String,
)(action: InputStream => T)(implicit ec: ExecutionContext): Future[Option[T]] = {
val request: awssdkV2.s3.model.GetObjectRequest =
awssdkV2.s3.model.GetObjectRequest.builder().key(key).bucket(bucket).build()

AWSv2.S3Async
.getObject(
request,
AsyncResponseTransformer.toBlockingInputStream[awssdkV2.s3.model.GetObjectResponse],
)
.toScala
.map { responseBytes =>
action(new GZIPInputStream(responseBytes))
}
.transform {
case Success(t) => Success(Some(t))
case Failure(e) =>
logS3Exception(key, e)
Success(None)
}
}

def get(key: String)(implicit codec: Codec): Option[String] =
withS3Result(key) { result =>
Source.fromInputStream(result.getObjectContent).mkString
Expand Down Expand Up @@ -96,6 +129,12 @@ trait S3 extends GuLogging {
Source.fromInputStream(new GZIPInputStream(result.getObjectContent)).mkString
}

def getGzippedV2(key: String)(implicit codec: Codec, ec: ExecutionContext): Future[Option[JsValue]] = {
withS3ResultV2(key) { inputStream =>
Using(inputStream)(Json.parse).get
}
}

private def putGzipped(
key: String,
value: String,
Expand Down
39 changes: 19 additions & 20 deletions common/app/services/fronts/FrontJsonFapi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import concurrent.{BlockingOperations, FutureSemaphore}
import conf.Configuration
import metrics.DurationMetric
import model.{PressedPage, PressedPageType}
import play.api.libs.json.Json
import play.api.libs.json.{JsValue, Json}
import services.S3

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, blocking}

trait FrontJsonFapi extends GuLogging {
lazy val stage: String = Configuration.facia.stage.toUpperCase
Expand All @@ -28,34 +28,33 @@ trait FrontJsonFapi extends GuLogging {
pressedPageFromS3(getAddressForPath(path, pageType.suffix))
}

private def parsePressedPage(
jsonStringOpt: Option[String],
)(implicit executionContext: ExecutionContext): Future[Option[PressedPage]] =
futureSemaphore.execute {
blockingOperations.executeBlocking {
jsonStringOpt.map { jsonString =>
DurationMetric.withMetrics(FaciaPressMetrics.FrontDecodingLatency) {
// This operation is run in the thread pool since it is very CPU intensive
Json.parse(jsonString).as[PressedPage]
}
}
private def deserialisePressedPage(
jsValueOpt: Option[JsValue],
)(implicit ec: ExecutionContext): Future[Option[PressedPage]] =
// futureSemaphore.execute {
// blockingOperations.executeBlocking {
Future.successful(jsValueOpt.map { jsValue =>
DurationMetric.withMetrics(FaciaPressMetrics.FrontDecodingLatency) {
// This operation is run in the thread pool since it is very CPU intensive
jsValue.as[PressedPage]
}
}
})
// }
// }

private def loadPressedPageFromS3(path: String) =
blockingOperations.executeBlocking {
DurationMetric.withMetrics(FaciaPressMetrics.FrontDownloadLatency) {
S3.getGzipped(path)
}
private def loadPressedPageFromS3(path: String)(implicit ec: ExecutionContext): Future[Option[JsValue]] = {
DurationMetric.withMetricsF(FaciaPressMetrics.FrontDownloadLatency) {
S3.getGzippedV2(path)
}
}

private def pressedPageFromS3(
path: String,
)(implicit executionContext: ExecutionContext): Future[Option[PressedPage]] =
errorLoggingF(s"FrontJsonFapi.pressedPageFromS3 $path") {
for {
s3FrontData <- loadPressedPageFromS3(path)
pressedPage <- parsePressedPage(s3FrontData)
pressedPage <- deserialisePressedPage(s3FrontData)
} yield pressedPage
}

Expand Down
26 changes: 26 additions & 0 deletions common/app/utils/AWSv2.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package utils

import software.amazon.awssdk.auth.credentials._
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.regions.Region.EU_WEST_1
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder

object AWSv2 {
val region: Region = EU_WEST_1

def credentialsForDevAndProd(devProfile: String, prodCreds: AwsCredentialsProvider): AwsCredentialsProviderChain =
AwsCredentialsProviderChain.of(prodCreds, ProfileCredentialsProvider.builder().profileName(devProfile).build())

lazy val credentials: AwsCredentialsProvider =
credentialsForDevAndProd("frontend", InstanceProfileCredentialsProvider.create())

def build[T, B <: AwsClientBuilder[B, T]](builder: B): T =
builder.credentialsProvider(credentials).region(region).build()

val S3Async: S3AsyncClient = build[S3AsyncClient, S3AsyncClientBuilder](
S3AsyncClient.builder().httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(250)),
)
}

0 comments on commit 47a5501

Please sign in to comment.