Skip to content

Commit

Permalink
Add dataset caching with OhMyArtifacts.jl, Arrow.jl & Serialization.jl
Browse files Browse the repository at this point in the history
  • Loading branch information
frankier committed Nov 1, 2022
1 parent 187da65 commit e4fe7fc
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 22 deletions.
3 changes: 3 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ authors = ["Frankie Robertson <[email protected]> and contributors"]
version = "0.1.0"

[deps]
Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45"
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
CodecBzip2 = "523fee87-0ab8-5b00-afb7-3ecf72e48cfd"
CodecXz = "ba30903b-d9e8-5048-a5ec-d1f5b0d4b47b"
CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4"
Downloads = "f43a241f-c20a-4ad4-852c-f6b1247861c6"
OhMyArtifacts = "cf8be1f4-309d-442e-839d-29d2a0af6cb7"
RData = "df47a6cb-8c03-5eed-afd8-b6050d6c41da"
Reexport = "189a3867-3050-52da-a836-e630ba90ab69"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Tar = "a4e569a6-e804-4fa4-b0f3-eef7a1d5b13e"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
UrlDownload = "856ac37a-3032-4c1c-9122-f86d88358c8b"
Expand Down
3 changes: 2 additions & 1 deletion src/RDataGet.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ using CodecBzip2
using CodecXz
@reexport using DataFrames

export dataset, datasets
export dataset, datasets, cran_arrow_artifact

include("cran.jl")
include("packages.jl")
include("datasets.jl")
include("dataset.jl")
include("artifacts.jl")

end
70 changes: 70 additions & 0 deletions src/artifacts.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using Arrow
using DataFrames
using OhMyArtifacts
using Serialization


const rdataget_artifacts = Ref{String}()

function __init__()
rdataget_artifacts[] = @my_artifacts_toml!(versioned=false)
end

function mk_artifact_name(package_name, version, dataset_name, varname)
"cran__" * package_name * "__" * version * "__" * dataset_name * (isnothing(varname) ? "" : ("__" * varname))
end

JL_SER_MAGIC = Vector{UInt8}(b"7JL")

function load_artifact(path)
magic = open(f -> read(f, 3), path)
if magic == JL_SER_MAGIC
return deserialize(path)
else
return DataFrame(Arrow.Table(path))
end
end

function serialize_artifact(io, data::Union{CSV.File, DataFrame})
Arrow.write(io, data)
end

function serialize_artifact(io, data)
serialize(io, data)
end

function cran_arrow_artifact(package_name, dataset_name, varname=nothing, version="auto"; cran_mirror=default_cran_mirror)
@debug "Saving RDataGet.jl artifact metadata to $(rdataget_artifacts[])"
if dataset_name == varname
# normalise this case for the artifact name
varname = nothing
end
name = mk_artifact_name(package_name, version, dataset_name, varname)
additional_name = nothing
filename = name * ".arrow"
hash = my_artifact_hash(name, rdataget_artifacts[])
if isnothing(hash)
hash = create_my_artifact() do artifact_dir
full_path = joinpath(artifact_dir, filename)
open(full_path, "w") do io
package, got_version = get_package_cached(package_name, cran_mirror)
if version == "auto"
additional_name = mk_artifact_name(package_name, got_version, dataset_name, varname)
end
tbl = table_from_data_dir(package, dataset_name, varname)
serialize_artifact(io, tbl)
end
return full_path
end
bind_my_artifact!(rdataget_artifacts[], name, hash)
end
if additional_name !== nothing
additional_name_hash = my_artifact_hash(additional_name, rdataget_artifacts[])
if isnothing(additional_name_hash)
bind_my_artifact!(rdataget_artifacts[], additional_name, hash)
else
# Assert hashes are the same?
end
end
return load_artifact(my_artifact_path(hash))
end
25 changes: 15 additions & 10 deletions src/cran.jl
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
using UrlDownload


function parse_description(description)
name = nothing
version = nothing
for line in split(description, "\n")
if startswith(line, "Package: ")
name = split(line, ": ")[2]
elseif startswith(line, "Version: ")
version = split(line, ": ")[2]
end
end
return name, version
end

function parse_packages(packages)
packages = String(packages)
verlook = Dict()
for package in split(packages, "\n\n")
name = nothing
version = nothing
for line in split(package, "\n")
if startswith(line, "Package: ")
name = split(line, ": ")[2]
elseif startswith(line, "Version: ")
version = split(line, ": ")[2]
end
end
for description in split(packages, "\n\n")
name, version = parse_description(description)
if name !== nothing && version !== nothing
verlook[name] = version
end
Expand Down
33 changes: 26 additions & 7 deletions src/dataset.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,48 @@ const default_cran_mirror = "https://cloud.r-project.org/"


function dataset(package_name, dataset_name, types=nothing, cran_mirror=default_cran_mirror)
dataset_from_data_dir(get_package_cached(package_name, cran_mirror), dataset_name, types)
return DataFrame(table_from_data_dir(get_package_cached(package_name, cran_mirror), dataset_name, types))
end

function dataset_from_data_dir(basename, dataset_name, types=nothing)
function table_from_data_dir(basename, dataset_name, varname=nothing, types=nothing)
rdas = [
joinpath(basename, string(dataset_name, ".rda")),
joinpath(basename, string(dataset_name, ".RData"))
]
for rdaname in rdas
if isfile(rdaname)
return load(rdaname)[dataset_name]
data = load(rdaname)
if isnothing(varname)
key = dataset_name
else
key = varname
end
if !haskey(data, key)
errmsg = "Could not find variable $key in .rda dataset $dataset_name, available keys: $(keys(data))"
if isnothing(varname)
errmsg *= " (Hint: try specifying the variable name explicitly)"
end
error(errmsg)
end
return data[key]
end
end

csvname = joinpath(basename, string(dataset_name, ".csv.gz"))
if isfile(csvname)
if !isnothing(varname)
error("Cannot specify varname for CSV files")
end
return open(csvname,"r") do io
uncompressed = IOBuffer(read(GzipDecompressorStream(io)))
DataFrame(
CSV.File(uncompressed, delim=',', quotechar='\"', missingstring="NA",
types=types)
return CSV.File(
uncompressed,
delim=',',
quotechar='\"',
missingstring="NA",
types=types
)
end
end
error("Unable to locate dataset from " * rdas[1] * " or " * rdas[2] * " or " * csvname)
end
end
11 changes: 7 additions & 4 deletions src/packages.jl
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
global __package_dirs = Dict()


function is_data(header)
function is_data_or_description(header)
bits = split(header.path, "/")
length(bits) >= 2 && bits[2] == "data"
return (length(bits) >= 2 && bits[2] == "data") || (length(bits) == 2 && bits[2] == "DESCRIPTION")
end

function get_package(package_name, cran_mirror=default_cran_mirror)
Expand All @@ -17,8 +17,11 @@ function get_package(package_name, cran_mirror=default_cran_mirror)
path = Downloads.download(url)
tar_gz = open(path)
tar = GzipDecompressorStream(tar_gz)
data_files = Tar.extract(is_data, tar)
joinpath(data_files, package_name, "data")
data_files = Tar.extract(is_data_or_description, tar)
version = nothing
description = open(f -> read(f, String), joinpath(data_files, package_name, "DESCRIPTION"))
_, version = parse_description(description)
return (joinpath(data_files, package_name, "data"), version)
end

function get_package_cached(package_name, cran_mirror=default_cran_mirror)
Expand Down

0 comments on commit e4fe7fc

Please sign in to comment.