Skip to content

Commit

Permalink
fix: stop loading entire file into memory to fix OOM errors
Browse files Browse the repository at this point in the history
  • Loading branch information
joshafeinberg committed Dec 2, 2022
1 parent 3748bce commit 8885092
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ data class OutcomeSummary(val matcher: MutableMap<TestOutcome, Int> = mutableMap
}

fun assertContainsUploads(input: String, vararg uploads: String) = uploads.forEach {
assertThat(input).contains("Uploading [$it]")
assertThat(input).containsMatch("Uploading( file)? \\[$it]".toPattern())
}

fun SuiteOverview.assertTestCountMatches(
Expand Down
10 changes: 10 additions & 0 deletions test_runner/src/main/kotlin/ftl/adapter/GcStorageFileUpload.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ftl.adapter

import ftl.api.RemoteStorage
import ftl.client.google.gcStorageUpload

object GcStorageFileUpload :
RemoteStorage.FileUpload,
(RemoteStorage.Dir, RemoteStorage.File) -> String by { dir, file ->
gcStorageUpload(file.path, dir.bucket, dir.path)
}
3 changes: 2 additions & 1 deletion test_runner/src/main/kotlin/ftl/adapter/GcStorageUpload.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ import ftl.client.google.gcStorageUpload
object GcStorageUpload :
RemoteStorage.Upload,
(RemoteStorage.Dir, RemoteStorage.Data) -> String by { dir, data ->
gcStorageUpload(data.path, data.bytes, dir.bucket, dir.path)
gcStorageUpload(data.path, dir.bucket, dir.path, data.bytes)
}

8 changes: 8 additions & 0 deletions test_runner/src/main/kotlin/ftl/api/RemoteStorage.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package ftl.api

import ftl.adapter.GcStorageExists
import ftl.adapter.GcStorageFileUpload
import ftl.adapter.GcStorageUpload

val uploadToRemoteStorage: RemoteStorage.Upload get() = GcStorageUpload
val uploadFileToRemoteStorage: RemoteStorage.FileUpload get() = GcStorageFileUpload
val existRemoteStorage: RemoteStorage.Exist get() = GcStorageExists

object RemoteStorage {
Expand Down Expand Up @@ -36,7 +38,13 @@ object RemoteStorage {
}
}

data class File(
val path: String
)

interface Exist : (Dir) -> Boolean

interface Upload : (Dir, Data) -> String

interface FileUpload : (Dir, File) -> String
}
72 changes: 50 additions & 22 deletions test_runner/src/main/kotlin/ftl/client/google/GcStorage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.google.cloud.storage.StorageOptions
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper
import com.google.common.annotations.VisibleForTesting
import flank.common.join
import flank.common.log
import ftl.args.IArgs
import ftl.config.FtlConstants
import ftl.config.FtlConstants.GCS_PREFIX
Expand All @@ -18,8 +19,8 @@ import ftl.util.runWithProgress
import java.io.File
import java.io.FileOutputStream
import java.net.URI
import java.nio.ByteBuffer
import java.nio.file.Files
import java.nio.file.Paths
import java.util.concurrent.ConcurrentHashMap

object GcStorage {
Expand Down Expand Up @@ -48,17 +49,6 @@ object GcStorage {
}
}

@VisibleForTesting
internal fun upload(file: String, rootGcsBucket: String, runGcsPath: String): String {
if (file.startsWith(GCS_PREFIX)) return file
return upload(
filePath = file,
fileBytes = Files.readAllBytes(Paths.get(file)),
rootGcsBucket = rootGcsBucket,
runGcsPath = runGcsPath
)
}

fun uploadJunitXml(testResultXml: String, args: IArgs) {
if (args.smartFlankGcsPath.isBlank() || args.smartFlankDisableUpload) return

Expand All @@ -77,9 +67,9 @@ object GcStorage {
@VisibleForTesting
internal fun upload(
filePath: String,
fileBytes: ByteArray,
rootGcsBucket: String,
runGcsPath: String
runGcsPath: String,
fileBytes: ByteArray? = null
): String {
val file = File(filePath)
return uploadCache.computeIfAbsent("$runGcsPath-${file.absolutePath}") {
Expand All @@ -92,11 +82,18 @@ object GcStorage {
}

// 404 Not Found error when rootGcsBucket does not exist
fileBytes.uploadWithProgress(
bucket = rootGcsBucket,
path = validGcsPath,
name = file.name
)
when {
fileBytes != null -> fileBytes.uploadWithProgress(
bucket = rootGcsBucket,
path = validGcsPath,
name = file.name
)
else -> file.uploadWithProgress(
bucket = rootGcsBucket,
path = validGcsPath,
name = file.name
)
}
GCS_PREFIX + join(rootGcsBucket, validGcsPath)
}
}
Expand All @@ -114,6 +111,37 @@ object GcStorage {
)
}

private fun File.uploadWithProgress(
bucket: String,
path: String,
name: String
) {
// TODO #2136 handle messages with state
runWithProgress(
startMessage = "Uploading file [$name] to ${GCS_STORAGE_LINK + join(bucket, path).replace(name, "")}..",
action = {
val blobInfo = BlobInfo.newBuilder(bucket, path).build()
if (this.length() < 1_000_000) { // 1_000_000 ~= 1MB
log("Uploaded blob")
val bytes = Files.readAllBytes(this.toPath())
storage.create(blobInfo, bytes)
return@runWithProgress
}

storage.writer(blobInfo).use { writer ->
val buffer = ByteArray(10240)
Files.newInputStream(this.toPath()).use { input ->
var limit: Int
while (input.read(buffer).also { limit = it } >= 0) {
writer.write(ByteBuffer.wrap(buffer, 0, limit))
}
}
}
},
onError = { throw FlankGeneralError("Error on uploading $name\nCause: $it") }
)
}

fun download(gcsUriString: String, ignoreError: Boolean = false): String {
val gcsURI = URI.create(gcsUriString)
val bucket = gcsURI.authority
Expand Down Expand Up @@ -152,10 +180,10 @@ object GcStorage {

internal fun gcStorageUpload(
filePath: String,
fileBytes: ByteArray,
rootGcsBucket: String,
runGcsPath: String
) = if (filePath.startsWith(GCS_PREFIX)) filePath else GcStorage.upload(filePath, fileBytes, rootGcsBucket, runGcsPath)
runGcsPath: String,
fileBytes: ByteArray? = null,
) = if (filePath.startsWith(GCS_PREFIX)) filePath else GcStorage.upload(filePath, rootGcsBucket, runGcsPath, fileBytes)

internal fun gcStorageExist(rootGcsBucket: String, runGcsPath: String) = GcStorage.exist(rootGcsBucket, runGcsPath)

Expand Down
8 changes: 3 additions & 5 deletions test_runner/src/main/kotlin/ftl/util/FileReference.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package ftl.util
import ftl.api.FileReference
import ftl.api.RemoteStorage
import ftl.api.downloadFileReference
import ftl.api.uploadToRemoteStorage
import ftl.api.uploadFileToRemoteStorage
import ftl.args.IArgs
import ftl.config.FtlConstants
import java.nio.file.Files
import java.nio.file.Paths

fun String.asFileReference(): FileReference =
if (startsWith(FtlConstants.GCS_PREFIX)) FileReference(remote = this) else FileReference(local = this)
Expand All @@ -28,8 +26,8 @@ fun FileReference.uploadIfNeeded(
): FileReference = if (remote.isNotBlank()) this else copy(remote = upload(local, rootGcsBucket, runGcsPath))

fun upload(file: String, rootGcsBucket: String, runGcsPath: String): String {
return uploadToRemoteStorage(
return uploadFileToRemoteStorage(
RemoteStorage.Dir(rootGcsBucket, runGcsPath),
RemoteStorage.Data(file, Files.readAllBytes(Paths.get(file)))
RemoteStorage.File(file)
)
}

0 comments on commit 8885092

Please sign in to comment.