Skip to content

Commit

Permalink
Do not buffer entire body in Http Cache (#4076)
Browse files Browse the repository at this point in the history
Co-authored-by: BoD <[email protected]>
Co-authored-by: Martin Bonnin <[email protected]>
  • Loading branch information
BoD and martinbonnin authored May 11, 2022
1 parent a20964c commit 0f50117
Show file tree
Hide file tree
Showing 18 changed files with 348 additions and 62 deletions.
4 changes: 2 additions & 2 deletions apollo-http-cache/api/apollo-http-cache.api
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ public abstract interface class com/apollographql/apollo3/cache/http/ApolloHttpC
public abstract fun clearAll ()V
public abstract fun read (Ljava/lang/String;)Lcom/apollographql/apollo3/api/http/HttpResponse;
public abstract fun remove (Ljava/lang/String;)V
public abstract fun write (Lcom/apollographql/apollo3/api/http/HttpResponse;Ljava/lang/String;)V
public abstract fun write (Lcom/apollographql/apollo3/api/http/HttpResponse;Ljava/lang/String;)Lcom/apollographql/apollo3/api/http/HttpResponse;
}

public final class com/apollographql/apollo3/cache/http/CachingHttpInterceptor : com/apollographql/apollo3/network/http/HttpInterceptor {
Expand Down Expand Up @@ -39,7 +39,7 @@ public final class com/apollographql/apollo3/cache/http/DiskLruHttpCache : com/a
public final fun delete ()V
public fun read (Ljava/lang/String;)Lcom/apollographql/apollo3/api/http/HttpResponse;
public fun remove (Ljava/lang/String;)V
public fun write (Lcom/apollographql/apollo3/api/http/HttpResponse;Ljava/lang/String;)V
public fun write (Lcom/apollographql/apollo3/api/http/HttpResponse;Ljava/lang/String;)Lcom/apollographql/apollo3/api/http/HttpResponse;
}

public final class com/apollographql/apollo3/cache/http/DiskLruHttpCache$Companion {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ interface ApolloHttpCache {

/**
* Store the [response] with the given [cacheKey] into the cache.
* Note: the response's body is not consumed nor closed.
* The response's body is not consumed nor closed.
* @return a new [HttpResponse] whose body, when read, will write the contents to the cache.
*/
fun write(response: HttpResponse, cacheKey: String)
fun write(response: HttpResponse, cacheKey: String): HttpResponse

@Throws(IOException::class)
fun clearAll()

@Throws(IOException::class)
fun remove(cacheKey: String)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class CachingHttpInterceptor(

override suspend fun intercept(request: HttpRequest, chain: HttpInterceptorChain): HttpResponse {
val policy = request.headers.valueOf(CACHE_FETCH_POLICY_HEADER) ?: defaultPolicy(request)
val cacheKey = cacheKey(request)

val cacheKey = request.headers.valueOf(CACHE_KEY_HEADER)!!
when (policy) {
CACHE_FIRST -> {
val cacheException: ApolloException
Expand Down Expand Up @@ -110,7 +109,7 @@ class CachingHttpInterceptor(
if (response.statusCode in 200..299 && !doNotStore) {
// Note: this write may fail if the same cacheKey is being stored by another thread.
// This is OK though: the other thread will be the one that stores it in the cache (see issue #3664).
lruHttpCache.write(
return lruHttpCache.write(
response.newBuilder()
.addHeaders(
listOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import com.apollographql.apollo3.api.http.HttpHeader
import com.apollographql.apollo3.api.http.HttpResponse
import com.apollographql.apollo3.cache.http.internal.DiskLruCache
import com.squareup.moshi.Moshi
import okio.Buffer
import okio.FileSystem
import okio.Sink
import okio.Source
import okio.Timeout
import okio.buffer
import java.io.File
import java.io.IOException
Expand Down Expand Up @@ -46,16 +50,13 @@ class DiskLruHttpCache(private val fileSystem: FileSystem, private val directory

/**
* Store the [response] with the given [cacheKey] into the cache.
* Note: the response's body is not consumed nor closed.
* A new [HttpResponse] is returned whose body, when read, will write the contents to the cache.
* The response's body is not consumed nor closed.
*/
override fun write(response: HttpResponse, cacheKey: String) {
override fun write(response: HttpResponse, cacheKey: String): HttpResponse {
val editor = cacheLock.read {
cache.edit(cacheKey)
}

if (editor == null) {
return
}
} ?: return response

try {
editor.newSink(ENTRY_HEADERS).buffer().use {
Expand All @@ -69,15 +70,14 @@ class DiskLruHttpCache(private val fileSystem: FileSystem, private val directory
)
adapter.toJson(it, map)
}
editor.newSink(ENTRY_BODY).buffer().use {
val responseBody = response.body
if (responseBody != null) {
it.writeAll(responseBody.peek())
}
}
editor.commit()
val bodySink = editor.newSink(ENTRY_BODY)
return HttpResponse.Builder(response.statusCode).apply {
headers(response.headers)
response.body?.let { body(ProxySource(it, bodySink, editor).buffer()) }
}.build()
} catch (e: Exception) {
editor.abort()
return response
}
}

Expand All @@ -104,6 +104,71 @@ class DiskLruHttpCache(private val fileSystem: FileSystem, private val directory
}
}

/**
* A [Source] that writes to the given cache sink as it is read.
*
* It commits all successful reads, even if they do not read until EOF. This is so that we can cache Json with extra trailing whitespace.
* If an error happens when reading the original source or writing to the cache sink, the edit is aborted.
* The commit or abort is done on [close].
*/
private class ProxySource(
private val originalSource: Source,
private val sink: Sink,
private val cacheEditor: DiskLruCache.Editor,
) : Source {

private val buffer = Buffer()
private var hasClosedAndCommitted: Boolean = false
private var hasReadError: Boolean = false

override fun read(sink: Buffer, byteCount: Long): Long {
val read = try {
originalSource.read(buffer, byteCount)
} catch (e: Exception) {
hasReadError = true
throw e
}

if (read == -1L) {
// We're at EOF
return -1L
}
try {
buffer.peek().readAll(this.sink)
} catch (e: Exception) {
hasReadError = true
}
try {
sink.writeAll(buffer)
} catch (e: Exception) {
hasReadError = true
throw e
}
return read
}

override fun close() {
if (!hasClosedAndCommitted) {
try {
sink.close()
if (hasReadError) {
cacheEditor.abort()
} else {
cacheEditor.commit()
}
} catch (e: Exception) {
// Silently ignore cache write errors
} finally {
hasClosedAndCommitted = true
}
originalSource.close()
}
}

override fun timeout(): Timeout = originalSource.timeout()
}


companion object {
private const val VERSION = 99991
private const val ENTRY_HEADERS = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ import com.apollographql.apollo3.api.Mutation
import com.apollographql.apollo3.api.Operation
import com.apollographql.apollo3.api.Query
import com.apollographql.apollo3.api.Subscription
import com.apollographql.apollo3.api.http.HttpRequest
import com.apollographql.apollo3.api.http.HttpResponse
import com.apollographql.apollo3.interceptor.ApolloInterceptor
import com.apollographql.apollo3.interceptor.ApolloInterceptorChain
import com.apollographql.apollo3.network.http.HttpInfo
import com.apollographql.apollo3.network.http.HttpInterceptor
import com.apollographql.apollo3.network.http.HttpInterceptorChain
import com.apollographql.apollo3.network.http.HttpNetworkTransport
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.onEach
import java.io.File

enum class HttpFetchPolicy {
Expand Down Expand Up @@ -54,28 +60,43 @@ fun ApolloClient.Builder.httpCache(
directory: File,
maxSize: Long,
): ApolloClient.Builder {

return addHttpInterceptor(
CachingHttpInterceptor(
directory = directory,
maxSize = maxSize,
)
val cachingHttpInterceptor = CachingHttpInterceptor(
directory = directory,
maxSize = maxSize,
)
var cacheKey: String? = null
return addHttpInterceptor(object : HttpInterceptor {
override suspend fun intercept(request: HttpRequest, chain: HttpInterceptorChain): HttpResponse {
cacheKey = CachingHttpInterceptor.cacheKey(request)
return chain.proceed(request.newBuilder().addHeader(CachingHttpInterceptor.CACHE_KEY_HEADER, cacheKey!!).build())
}
}).addHttpInterceptor(
cachingHttpInterceptor
).addInterceptor(object : ApolloInterceptor {
override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
return chain.proceed(request.newBuilder()
.addHttpHeader(
CachingHttpInterceptor.CACHE_OPERATION_TYPE_HEADER,
when (request.operation) {
is Query<*> -> "query"
is Mutation<*> -> "mutation"
is Subscription<*> -> "subscription"
else -> error("Unknown operation type")
}
)
.build()
)
return chain.proceed(
request.newBuilder()
.addHttpHeader(
CachingHttpInterceptor.CACHE_OPERATION_TYPE_HEADER,
when (request.operation) {
is Query<*> -> "query"
is Mutation<*> -> "mutation"
is Subscription<*> -> "subscription"
else -> error("Unknown operation type")
}
)
.build()
).catch { throwable ->
// Revert caching of responses with errors
cacheKey?.let { cachingHttpInterceptor.cache.remove(it) }
throw throwable
}.onEach { response ->
// Revert caching of responses with errors
if (response.hasErrors()) {
cacheKey?.let { cachingHttpInterceptor.cache.remove(it) }
}
}
}

})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,6 @@ internal class DiskLruCache(
}
}
}

}

inner class Entry internal constructor(val key: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith

@Suppress("BlockingMethodInNonBlockingContext")
class CachingHttpInterceptorTest {
private lateinit var mockServer: MockServer
private lateinit var interceptor: CachingHttpInterceptor
Expand All @@ -37,16 +38,22 @@ class CachingHttpInterceptorTest {

@Test
fun successResponsesAreCached() {
mockServer.enqueue(MockResponse(statusCode = 200, body = "success"))
val body = "success"
mockServer.enqueue(MockResponse(statusCode = 200, body = body))

runBlocking {
val request = HttpRequest.Builder(
method = HttpMethod.Get,
url = mockServer.url(),
).build()
)
.withCacheKey()
.build()

var response = interceptor.intercept(request, chain)
assertEquals("success", response.body?.readUtf8())
assertEquals(body, response.body?.readUtf8())

// Cache is committed when the body is closed
response.body?.close()

// 2nd request should hit the cache
response = interceptor.intercept(
Expand All @@ -55,7 +62,7 @@ class CachingHttpInterceptorTest {
.build(),
chain
)
assertEquals("success", response.body?.readUtf8())
assertEquals(body, response.body?.readUtf8())
assertEquals("true", response.headers.valueOf(CachingHttpInterceptor.FROM_CACHE))
}
}
Expand All @@ -68,7 +75,9 @@ class CachingHttpInterceptorTest {
val request = HttpRequest.Builder(
method = HttpMethod.Get,
url = mockServer.url(),
).build()
)
.withCacheKey()
.build()

// Warm the cache
val response = interceptor.intercept(request, chain)
Expand All @@ -88,17 +97,21 @@ class CachingHttpInterceptorTest {

@Test
fun timeoutWorks() {
mockServer.enqueue(MockResponse(statusCode = 200, body = "success"))

val body = "success"
mockServer.enqueue(MockResponse(statusCode = 200, body = body))
runBlocking {
val request = HttpRequest.Builder(
method = HttpMethod.Get,
url = mockServer.url(),
).build()
)
.withCacheKey()
.build()

// Warm the cache
var response = interceptor.intercept(request, chain)
assertEquals("success", response.body?.readUtf8())
assertEquals(body, response.body?.readUtf8())
// Cache is committed when the body is closed
response.body?.close()

// 2nd request should hit the cache
response = interceptor.intercept(
Expand All @@ -107,7 +120,9 @@ class CachingHttpInterceptorTest {
.build(),
chain
)
assertEquals("success", response.body?.readUtf8())
assertEquals(body, response.body?.readUtf8())
// Cache is committed when the body is closed
response.body?.close()

delay(1000)
// 3rd request with a 500ms timeout should miss
Expand Down Expand Up @@ -136,7 +151,9 @@ class CachingHttpInterceptorTest {
val request = HttpRequest.Builder(
method = HttpMethod.Get,
url = mockServer.url(),
).build()
)
.withCacheKey()
.build()

val response = interceptor.intercept(request, chain)
assertEquals("success", response.body?.readUtf8())
Expand All @@ -147,6 +164,11 @@ class CachingHttpInterceptorTest {
}
}

private fun HttpRequest.Builder.withCacheKey(): HttpRequest.Builder {
val cacheKey = CachingHttpInterceptor.cacheKey(build())
return addHeader(CachingHttpInterceptor.CACHE_KEY_HEADER, cacheKey)
}

private class TestHttpInterceptorChain : HttpInterceptorChain {
val engine = DefaultHttpEngine()

Expand Down
2 changes: 2 additions & 0 deletions apollo-mpp-utils/api/apollo-mpp-utils.api
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public final class com/apollographql/apollo3/mpp/Platform : java/lang/Enum {
public final class com/apollographql/apollo3/mpp/UtilsKt {
public static final fun assertMainThreadOnNative ()V
public static final fun currentThreadId ()Ljava/lang/String;
public static final fun currentThreadName ()Ljava/lang/String;
public static final fun currentTimeFormatted ()Ljava/lang/String;
public static final fun currentTimeMillis ()J
public static final fun ensureNeverFrozen (Ljava/lang/Object;)V
public static final fun freeze (Ljava/lang/Object;)V
Expand Down
Loading

0 comments on commit 0f50117

Please sign in to comment.