Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WX-499] DRS Parallel Downloads Follow-up #7229

Merged
merged 11 commits into from
Oct 11, 2023
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package drs.localizer.downloaders

import cats.effect.{ExitCode, IO}
import cloud.nio.impl.drs.{AccessUrl, DrsResolverResponse}
import cloud.nio.impl.drs.{AccessUrl}
import com.typesafe.scalalogging.StrictLogging

import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
import scala.sys.process.{Process, ProcessLogger}
import scala.util.matching.Regex
import drs.localizer.ResolvedDrsUrl
import spray.json.DefaultJsonProtocol.{StringJsonFormat, listFormat, mapFormat}
import spray.json._
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉 🎉 🎉


case class GetmResult(returnCode: Int, stderr: String)
/**
* Getm is a python tool that is used to download resolved DRS uris quickly and in parallel.
Expand Down Expand Up @@ -40,39 +43,25 @@ case class BulkAccessUrlDownloader(resolvedUrls : List[ResolvedDrsUrl]) extends
* @return Filepath of a getm-manifest.json that Getm can use to download multiple files in parallel.
*/
def generateJsonManifest(resolvedUrls : List[ResolvedDrsUrl]): IO[Path] = {
def toJsonString(drsResponse: DrsResolverResponse, destinationFilepath: String): String = {
//NB: trailing comma is being removed in generateJsonManifest
val accessUrl: AccessUrl = drsResponse.accessUrl.getOrElse(AccessUrl("missing", None))
drsResponse.hashes.map(_ => {
val checksum = GetmChecksum(drsResponse.hashes, accessUrl).value.getOrElse("error_calculating_checksum")
val checksumAlgorithm = GetmChecksum(drsResponse.hashes, accessUrl).getmAlgorithm
s""" {
| "url" : "${accessUrl.url}",
| "filepath" : "$destinationFilepath",
| "checksum" : "$checksum",
| "checksum-algorithm" : "$checksumAlgorithm"
| },
|""".stripMargin
}).getOrElse(
s""" {
| "url" : "${accessUrl.url}",
| "filepath" : "$destinationFilepath"
| },
|""".stripMargin
)
}
IO {
var jsonString: String = "[\n"
for (resolvedUrl <- resolvedUrls) {
jsonString += toJsonString(resolvedUrl.drsResponse, resolvedUrl.downloadDestinationPath)
}
if(jsonString.contains(',')) {
//remove trailing comma from array elements, but don't crash on empty list.
jsonString = jsonString.substring(0, jsonString.lastIndexOf(","))
}
jsonString += "\n]"
Files.write(getmManifestPath, jsonString.getBytes(StandardCharsets.UTF_8))
def resolvedUrlToJsonMap(resolvedUrl: ResolvedDrsUrl): Map[String,String] = {
val accessUrl: AccessUrl = resolvedUrl.drsResponse.accessUrl.getOrElse(AccessUrl("missing", None))
resolvedUrl.drsResponse.hashes.map{_ =>
val checksum = GetmChecksum(resolvedUrl.drsResponse.hashes, accessUrl).value.getOrElse("error_calculating_checksum")
val checksumAlgorithm = GetmChecksum(resolvedUrl.drsResponse.hashes, accessUrl).getmAlgorithm
Map(
("url", accessUrl.url),
("filepath", resolvedUrl.downloadDestinationPath),
("checksum", checksum),
("checksum-algorithm", checksumAlgorithm)
)
}.getOrElse(Map(
("url", accessUrl.url),
("filepath", resolvedUrl.downloadDestinationPath)
))
}

val jsonArray: String = resolvedUrls.map(resolved => resolvedUrlToJsonMap(resolved)).toJson.prettyPrint
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker for this PR, just TOL - I wonder how many files we could handle before we wouldn't want to handle this as a single string.

Copy link
Contributor Author

@THWiseman THWiseman Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting question! Would that bring us back to manual json-string-munging so we could stream bytes into a file?? ;)

Back of the napkin math: a long DRS URL might be 1,000 characters. Java strings are 40 + length bytes. Call it 1kb per DRS url.

100MB of RAM gets us ~100,000 DRS URLs. Seems like we would need some pretty extreme inputs to cause a problem.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would probably bring us to a streaming write using Spray - surely there's a way to stream bytes into an open files using IO. Seems like not something we'll need to worry about in the near future, though!

IO(Files.write(getmManifestPath, jsonArray.getBytes(StandardCharsets.UTF_8)))
}

def deleteJsonManifest() = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,17 @@ class BulkAccessUrlDownloaderSpec extends AnyFlatSpec with CromwellTimeoutSpec w
val threeElements: List[ResolvedDrsUrl] = List(ex1, ex2, ex3)

it should "correctly parse a collection of Access Urls into a manifest.json" in {
val expected: String =
s"""|[
| {
| "url" : "https://my.fake/url123",
| "filepath" : "path/to/local/download/dest"
| },
| {
| "url" : "https://my.fake/url1234",
| "filepath" : "path/to/local/download/dest2"
| },
| {
| "url" : "https://my.fake/url1235",
| "filepath" : "path/to/local/download/dest3"
| }
|]""".stripMargin

val expected =
"""[{
| "url": "https://my.fake/url123",
| "filepath": "path/to/local/download/dest"
|}, {
| "url": "https://my.fake/url1234",
| "filepath": "path/to/local/download/dest2"
|}, {
| "url": "https://my.fake/url1235",
| "filepath": "path/to/local/download/dest3"
|}]""".stripMargin
val downloader = BulkAccessUrlDownloader(threeElements)

val filepath: IO[Path] = downloader.generateJsonManifest(threeElements)
Expand All @@ -45,11 +40,7 @@ class BulkAccessUrlDownloaderSpec extends AnyFlatSpec with CromwellTimeoutSpec w
}

it should "properly construct empty JSON array from empty list." in {
val expected: String =
s"""|[
|
|]""".stripMargin

val expected: String = "[]"
val downloader = BulkAccessUrlDownloader(emptyList)
val filepath: IO[Path] = downloader.generateJsonManifest(emptyList)
val source = scala.io.Source.fromFile(filepath.unsafeRunSync().toString)
Expand All @@ -59,12 +50,10 @@ class BulkAccessUrlDownloaderSpec extends AnyFlatSpec with CromwellTimeoutSpec w

it should "properly construct JSON array from single element list." in {
val expected: String =
s"""|[
| {
| "url" : "https://my.fake/url123",
| "filepath" : "path/to/local/download/dest"
| }
|]""".stripMargin
s"""|[{
| "url": "https://my.fake/url123",
| "filepath": "path/to/local/download/dest"
|}]""".stripMargin

val downloader = BulkAccessUrlDownloader(oneElement)
val filepath: IO[Path] = downloader.generateJsonManifest(oneElement)
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ object Dependencies {
"com.softwaremill.sttp" %% "circe" % sttpV,
"com.github.scopt" %% "scopt" % scoptV,
"org.apache.commons" % "commons-csv" % commonsCsvV,
"io.spray" %% "spray-json" % sprayJsonV,
) ++ circeDependencies ++ catsDependencies ++ slf4jBindingDependencies ++ languageFactoryDependencies ++ azureDependencies

val allProjectDependencies: List[ModuleID] =
Expand Down