Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: stop loading entire file into memory to fix OOM errors #2274

Merged
merged 2 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ data class OutcomeSummary(val matcher: MutableMap<TestOutcome, Int> = mutableMap
}

fun assertContainsUploads(input: String, vararg uploads: String) = uploads.forEach {
assertThat(input).contains("Uploading [$it]")
assertThat(input).containsMatch("Uploading( file)? \\[$it]".toPattern())
}

fun SuiteOverview.assertTestCountMatches(
Expand Down
10 changes: 10 additions & 0 deletions test_runner/src/main/kotlin/ftl/adapter/GcStorageFileUpload.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ftl.adapter

import ftl.api.RemoteStorage
import ftl.client.google.gcStorageUpload

object GcStorageFileUpload :
RemoteStorage.FileUpload,
(RemoteStorage.Dir, RemoteStorage.File) -> String by { dir, file ->
gcStorageUpload(file.path, dir.bucket, dir.path)
}
3 changes: 2 additions & 1 deletion test_runner/src/main/kotlin/ftl/adapter/GcStorageUpload.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ import ftl.client.google.gcStorageUpload
object GcStorageUpload :
RemoteStorage.Upload,
(RemoteStorage.Dir, RemoteStorage.Data) -> String by { dir, data ->
gcStorageUpload(data.path, data.bytes, dir.bucket, dir.path)
gcStorageUpload(data.path, dir.bucket, dir.path, data.bytes)
}

8 changes: 8 additions & 0 deletions test_runner/src/main/kotlin/ftl/api/RemoteStorage.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package ftl.api

import ftl.adapter.GcStorageExists
import ftl.adapter.GcStorageFileUpload
import ftl.adapter.GcStorageUpload

val uploadToRemoteStorage: RemoteStorage.Upload get() = GcStorageUpload
val uploadFileToRemoteStorage: RemoteStorage.FileUpload get() = GcStorageFileUpload
val existRemoteStorage: RemoteStorage.Exist get() = GcStorageExists

object RemoteStorage {
Expand Down Expand Up @@ -36,7 +38,13 @@ object RemoteStorage {
}
}

data class File(
val path: String
)

interface Exist : (Dir) -> Boolean

interface Upload : (Dir, Data) -> String

interface FileUpload : (Dir, File) -> String
}
72 changes: 50 additions & 22 deletions test_runner/src/main/kotlin/ftl/client/google/GcStorage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.google.cloud.storage.StorageOptions
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper
import com.google.common.annotations.VisibleForTesting
import flank.common.join
import flank.common.log
import ftl.args.IArgs
import ftl.config.FtlConstants
import ftl.config.FtlConstants.GCS_PREFIX
Expand All @@ -18,8 +19,8 @@ import ftl.util.runWithProgress
import java.io.File
import java.io.FileOutputStream
import java.net.URI
import java.nio.ByteBuffer
import java.nio.file.Files
import java.nio.file.Paths
import java.util.concurrent.ConcurrentHashMap

object GcStorage {
Expand Down Expand Up @@ -48,17 +49,6 @@ object GcStorage {
}
}

@VisibleForTesting
internal fun upload(file: String, rootGcsBucket: String, runGcsPath: String): String {
if (file.startsWith(GCS_PREFIX)) return file
return upload(
filePath = file,
fileBytes = Files.readAllBytes(Paths.get(file)),
rootGcsBucket = rootGcsBucket,
runGcsPath = runGcsPath
)
}

fun uploadJunitXml(testResultXml: String, args: IArgs) {
if (args.smartFlankGcsPath.isBlank() || args.smartFlankDisableUpload) return

Expand All @@ -77,9 +67,9 @@ object GcStorage {
@VisibleForTesting
internal fun upload(
filePath: String,
fileBytes: ByteArray,
rootGcsBucket: String,
runGcsPath: String
runGcsPath: String,
fileBytes: ByteArray? = null
): String {
val file = File(filePath)
return uploadCache.computeIfAbsent("$runGcsPath-${file.absolutePath}") {
Expand All @@ -92,11 +82,18 @@ object GcStorage {
}

// 404 Not Found error when rootGcsBucket does not exist
fileBytes.uploadWithProgress(
bucket = rootGcsBucket,
path = validGcsPath,
name = file.name
)
when {
fileBytes != null -> fileBytes.uploadWithProgress(
bucket = rootGcsBucket,
path = validGcsPath,
name = file.name
)
else -> file.uploadWithProgress(
bucket = rootGcsBucket,
path = validGcsPath,
name = file.name
)
}
GCS_PREFIX + join(rootGcsBucket, validGcsPath)
}
}
Expand All @@ -114,6 +111,37 @@ object GcStorage {
)
}

private fun File.uploadWithProgress(
bucket: String,
path: String,
name: String
) {
// TODO #2136 handle messages with state
runWithProgress(
startMessage = "Uploading file [$name] to ${GCS_STORAGE_LINK + join(bucket, path).replace(name, "")}..",
adamvduke marked this conversation as resolved.
Show resolved Hide resolved
action = {
val blobInfo = BlobInfo.newBuilder(bucket, path).build()
if (this.length() < 1_000_000) { // 1_000_000 ~= 1MB
log("Uploaded blob")
val bytes = Files.readAllBytes(this.toPath())
storage.create(blobInfo, bytes)
return@runWithProgress
}

storage.writer(blobInfo).use { writer ->
val buffer = ByteArray(10240)
Files.newInputStream(this.toPath()).use { input ->
var limit: Int
while (input.read(buffer).also { limit = it } >= 0) {
writer.write(ByteBuffer.wrap(buffer, 0, limit))
}
}
}
},
onError = { throw FlankGeneralError("Error on uploading $name\nCause: $it") }
)
}

fun download(gcsUriString: String, ignoreError: Boolean = false): String {
val gcsURI = URI.create(gcsUriString)
val bucket = gcsURI.authority
Expand Down Expand Up @@ -152,10 +180,10 @@ object GcStorage {

internal fun gcStorageUpload(
filePath: String,
fileBytes: ByteArray,
rootGcsBucket: String,
runGcsPath: String
) = if (filePath.startsWith(GCS_PREFIX)) filePath else GcStorage.upload(filePath, fileBytes, rootGcsBucket, runGcsPath)
runGcsPath: String,
fileBytes: ByteArray? = null,
) = if (filePath.startsWith(GCS_PREFIX)) filePath else GcStorage.upload(filePath, rootGcsBucket, runGcsPath, fileBytes)

internal fun gcStorageExist(rootGcsBucket: String, runGcsPath: String) = GcStorage.exist(rootGcsBucket, runGcsPath)

Expand Down
8 changes: 3 additions & 5 deletions test_runner/src/main/kotlin/ftl/util/FileReference.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package ftl.util
import ftl.api.FileReference
import ftl.api.RemoteStorage
import ftl.api.downloadFileReference
import ftl.api.uploadToRemoteStorage
import ftl.api.uploadFileToRemoteStorage
import ftl.args.IArgs
import ftl.config.FtlConstants
import java.nio.file.Files
import java.nio.file.Paths

fun String.asFileReference(): FileReference =
if (startsWith(FtlConstants.GCS_PREFIX)) FileReference(remote = this) else FileReference(local = this)
Expand All @@ -28,8 +26,8 @@ fun FileReference.uploadIfNeeded(
): FileReference = if (remote.isNotBlank()) this else copy(remote = upload(local, rootGcsBucket, runGcsPath))

fun upload(file: String, rootGcsBucket: String, runGcsPath: String): String {
return uploadToRemoteStorage(
return uploadFileToRemoteStorage(
RemoteStorage.Dir(rootGcsBucket, runGcsPath),
RemoteStorage.Data(file, Files.readAllBytes(Paths.get(file)))
RemoteStorage.File(file)
)
}