Skip to content

Commit

Permalink
Merge pull request #56 from guardian/suppress-caffeine-exceptions-by-…
Browse files Browse the repository at this point in the history
…allowing-fetch-results-to-indicate-missing-keys

Avoid Caffeine exceptions on missing values
  • Loading branch information
rtyley authored Jul 23, 2024
2 parents fee003c + cbc4304 commit 65040d9
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package com.gu.etagcaching.aws.sdkv2.s3
import com.gu.etagcaching.Endo
import com.gu.etagcaching.aws.s3.ObjectId
import com.gu.etagcaching.aws.sdkv2.s3.response.Transformer
import com.gu.etagcaching.fetching.{ETaggedData, Fetching}
import com.gu.etagcaching.fetching.{ETaggedData, Fetching, Missing, MissingOrETagged}
import software.amazon.awssdk.core.internal.util.ThrowableUtils
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.{GetObjectRequest, S3Exception}
import software.amazon.awssdk.services.s3.model.{GetObjectRequest, NoSuchKeyException, S3Exception}

import java.net.HttpURLConnection.HTTP_NOT_MODIFIED
import java.net.HttpURLConnection.{HTTP_NOT_FOUND, HTTP_NOT_MODIFIED}
import scala.compat.java8.FutureConverters._
import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -20,7 +20,7 @@ case class S3ObjectFetching[Response](s3Client: S3AsyncClient, transformer: Tran
private def performFetch(
resourceId: ObjectId,
reqModifier: Endo[GetObjectRequest.Builder] = identity,
)(implicit ec: ExecutionContext): Future[ETaggedData[Response]] = {
)(implicit ec: ExecutionContext): Future[MissingOrETagged[Response]] = {
val requestBuilder = GetObjectRequest.builder().bucket(resourceId.bucket).key(resourceId.key)
val request = reqModifier(requestBuilder).build()
s3Client.getObject(request, transformer.asyncResponseTransformer())
Expand All @@ -29,11 +29,14 @@ case class S3ObjectFetching[Response](s3Client: S3AsyncClient, transformer: Tran
wrapWithETag,
ThrowableUtils.getRootCause // see https://github.com/guardian/ophan/commit/49fa22176
)
.recover {
case e: NoSuchKeyException if e.statusCode == HTTP_NOT_FOUND => Missing
}
}

def fetch(key: ObjectId)(implicit ec: ExecutionContext): Future[ETaggedData[Response]] = performFetch(key)
def fetch(key: ObjectId)(implicit ec: ExecutionContext): Future[MissingOrETagged[Response]] = performFetch(key)

def fetchOnlyIfETagChanged(key: ObjectId, eTag: String)(implicit ec: ExecutionContext): Future[Option[ETaggedData[Response]]] =
def fetchOnlyIfETagChanged(key: ObjectId, eTag: String)(implicit ec: ExecutionContext): Future[Option[MissingOrETagged[Response]]] =
performFetch(key, _.ifNoneMatch(eTag)).map(Some(_)).recover {
case e: S3Exception if e.statusCode == HTTP_NOT_MODIFIED => None // no fresh download because the ETag matched!
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.gu.etagcaching.aws.s3.ObjectId
import com.gu.etagcaching.aws.sdkv2.s3.ExampleParser.parseFruit
import com.gu.etagcaching.aws.sdkv2.s3.S3ClientForS3Mock.createS3clientFor
import com.gu.etagcaching.aws.sdkv2.s3.response.Transformer.Bytes
import org.scalatest.BeforeAndAfter
import org.scalatest.{BeforeAndAfter, OptionValues}
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand All @@ -20,8 +20,9 @@ import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt

class S3ObjectFetchingTest extends AnyFlatSpec with Matchers with ScalaFutures with IntegrationPatience with BeforeAndAfter {
class S3ObjectFetchingTest extends AnyFlatSpec with Matchers with ScalaFutures with OptionValues with IntegrationPatience with BeforeAndAfter {
val ExampleS3Object: ObjectId = ObjectId("test-bucket", "path")
val ExampleMissingS3Object: ObjectId = ObjectId("test-bucket", "nothing-should-be-here")

val s3Mock: S3MockContainer = new S3MockContainer("latest").withInitialBuckets(ExampleS3Object.bucket)
before(s3Mock.start())
Expand All @@ -43,12 +44,14 @@ class S3ObjectFetchingTest extends AnyFlatSpec with Matchers with ScalaFutures w
)

uploadFile("banana.xml.gz")
fruitCache.get(ExampleS3Object).futureValue.colour shouldBe "yellow"
fruitCache.get(ExampleS3Object).futureValue.colour shouldBe "yellow"
fruitCache.get(ExampleS3Object).futureValue.value.colour shouldBe "yellow"
fruitCache.get(ExampleS3Object).futureValue.value.colour shouldBe "yellow"

uploadFile("kiwi.xml.gz")
fruitCache.get(ExampleS3Object).futureValue.colour shouldBe "green"
fruitCache.get(ExampleS3Object).futureValue.value.colour shouldBe "green"
// Note that the value is correct, without us explicitly clearing the cache - ETag-checking saved us!

fruitCache.get(ExampleMissingS3Object).futureValue shouldBe None
}

private def uploadFile(demoFile: String): Unit = {
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/scala/com/gu/etagcaching/ETagCache.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.gu.etagcaching

import com.github.blemale.scaffeine.Scaffeine
import com.gu.etagcaching.fetching.ETaggedData
import com.github.blemale.scaffeine.{AsyncLoadingCache, Scaffeine}
import com.gu.etagcaching.fetching.{ETaggedData, Missing, MissingOrETagged}

import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -34,13 +34,16 @@ class ETagCache[K, V](
configureCache: ConfigCache
)(implicit ec: ExecutionContext) {

private val cache = configureCache(Scaffeine()).buildAsyncFuture[K, ETaggedData[V]](
private val cache: AsyncLoadingCache[K, MissingOrETagged[V]] = configureCache(Scaffeine()).buildAsyncFuture[K, MissingOrETagged[V]](
loader = loading.fetchAndParse,
reloadLoader = Some(loading.fetchThenParseIfNecessary)
)
reloadLoader = Some(
(key: K, old: MissingOrETagged[V]) => old match {
case Missing => loading.fetchAndParse(key)
case oldETaggedData: ETaggedData[V] => loading.fetchThenParseIfNecessary(key, oldETaggedData)
}
))

private val read = freshnessPolicy.on(cache)

def get(key: K): Future[V] = read(key).map(_.result)

def get(key: K): Future[Option[V]] = read(key).map(_.toOption)
}
10 changes: 5 additions & 5 deletions core/src/main/scala/com/gu/etagcaching/Loading.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.gu.etagcaching

import com.gu.etagcaching.fetching.{ETaggedData, Fetching}
import com.gu.etagcaching.fetching.{ETaggedData, Fetching, MissingOrETagged}

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -17,22 +17,22 @@ import scala.concurrent.{ExecutionContext, Future}
* @tparam V The 'value' for the key - a parsed representation of whatever was in the resource data.
*/
trait Loading[K, V] {
def fetchAndParse(k: K)(implicit ec: ExecutionContext): Future[ETaggedData[V]]
def fetchAndParse(k: K)(implicit ec: ExecutionContext): Future[MissingOrETagged[V]]

/**
* When we have ''old'' `ETaggedData`, we can send the `ETag` with the Fetch request, and the server will return
* a blank HTTP 304 `Not Modified` response if the content hasn't changed - which means we DO NOT need to parse
* any new data, and can just reuse our old data, saving us CPU time and network bandwidth.
*/
def fetchThenParseIfNecessary(k: K, oldV: ETaggedData[V])(implicit ec: ExecutionContext): Future[ETaggedData[V]]
def fetchThenParseIfNecessary(k: K, oldV: ETaggedData[V])(implicit ec: ExecutionContext): Future[MissingOrETagged[V]]
}

object Loading {
def by[K, Response, V](fetching: Fetching[K, Response])(parse: Response => V): Loading[K, V] = new Loading[K, V] {
def fetchAndParse(key: K)(implicit ec: ExecutionContext): Future[ETaggedData[V]] =
def fetchAndParse(key: K)(implicit ec: ExecutionContext): Future[MissingOrETagged[V]] =
fetching.fetch(key).map(_.map(parse))

def fetchThenParseIfNecessary(key: K, oldV: ETaggedData[V])(implicit ec: ExecutionContext): Future[ETaggedData[V]] =
def fetchThenParseIfNecessary(key: K, oldV: ETaggedData[V])(implicit ec: ExecutionContext): Future[MissingOrETagged[V]] =
fetching.fetchOnlyIfETagChanged(key, oldV.eTag).map {
case None => oldV // we got HTTP 304 'NOT MODIFIED': there's no new data - old data is still valid
case Some(freshResponse) => freshResponse.map(parse)
Expand Down

This file was deleted.

18 changes: 9 additions & 9 deletions core/src/main/scala/com/gu/etagcaching/fetching/Fetching.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import com.gu.etagcaching.fetching.Fetching._

import java.time.{Duration, Instant}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import scala.util.Try

trait Fetching[K, Response] {
def fetch(key: K)(implicit ec: ExecutionContext): Future[ETaggedData[Response]]
def fetch(key: K)(implicit ec: ExecutionContext): Future[MissingOrETagged[Response]]

def fetchOnlyIfETagChanged(key: K, eTag: String)(implicit ec: ExecutionContext): Future[Option[ETaggedData[Response]]]
def fetchOnlyIfETagChanged(key: K, eTag: String)(implicit ec: ExecutionContext): Future[Option[MissingOrETagged[Response]]]

def timing(
attemptWith: Duration => Unit = _ => (),
Expand Down Expand Up @@ -57,28 +57,28 @@ object Fetching {
resultF
}

override def fetch(key: K)(implicit ec: ExecutionContext): Future[ETaggedData[Response]] =
override def fetch(key: K)(implicit ec: ExecutionContext): Future[MissingOrETagged[Response]] =
time(underlying.fetch(key))(_ => FullFetch)

override def fetchOnlyIfETagChanged(key: K, eTag: String)(implicit ec: ExecutionContext): Future[Option[ETaggedData[Response]]] =
override def fetchOnlyIfETagChanged(key: K, eTag: String)(implicit ec: ExecutionContext): Future[Option[MissingOrETagged[Response]]] =
time(underlying.fetchOnlyIfETagChanged(key, eTag))(_.map(_ => FullFetch).getOrElse(NotModified))
}

private case class KeyAdapter[K, UnderlyingK, Response](underlying: Fetching[UnderlyingK, Response])(f: K => UnderlyingK)
extends Fetching[K, Response] {
override def fetch(key: K)(implicit ec: ExecutionContext): Future[ETaggedData[Response]] =
override def fetch(key: K)(implicit ec: ExecutionContext): Future[MissingOrETagged[Response]] =
underlying.fetch(f(key))

override def fetchOnlyIfETagChanged(key: K, eTag: String)(implicit ec: ExecutionContext): Future[Option[ETaggedData[Response]]] =
override def fetchOnlyIfETagChanged(key: K, eTag: String)(implicit ec: ExecutionContext): Future[Option[MissingOrETagged[Response]]] =
underlying.fetchOnlyIfETagChanged(f(key), eTag)
}

private case class ResponseMapper[K, UnderlyingResponse, Response](underlying: Fetching[K, UnderlyingResponse])(f: UnderlyingResponse => Response)
extends Fetching[K, Response] {
override def fetch(key: K)(implicit ec: ExecutionContext): Future[ETaggedData[Response]] =
override def fetch(key: K)(implicit ec: ExecutionContext): Future[MissingOrETagged[Response]] =
underlying.fetch(key).map(_.map(f))

override def fetchOnlyIfETagChanged(key: K, eTag: String)(implicit ec: ExecutionContext): Future[Option[ETaggedData[Response]]] =
override def fetchOnlyIfETagChanged(key: K, eTag: String)(implicit ec: ExecutionContext): Future[Option[MissingOrETagged[Response]]] =
underlying.fetchOnlyIfETagChanged(key, eTag).map(_.map(_.map(f)))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.gu.etagcaching.fetching

sealed trait MissingOrETagged[+T] {
def map[S](f: T => S): MissingOrETagged[S]
def toOption: Option[T]
}

case object Missing extends MissingOrETagged[Nothing] {
override def map[S](f: Nothing => S): MissingOrETagged[S] = Missing
override def toOption: Option[Nothing] = None
}

/**
* @param eTag https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag
*/
case class ETaggedData[T](eTag: String, result: T) extends MissingOrETagged[T] {
override def map[S](f: T => S): ETaggedData[S] = copy(result = f(result))
override def toOption: Option[T] = Some(result)
}

0 comments on commit 65040d9

Please sign in to comment.