diff --git a/Project.toml b/Project.toml index 3e4ef149..39879f50 100644 --- a/Project.toml +++ b/Project.toml @@ -5,9 +5,6 @@ version = "0.8.4" [deps] CFTime = "179af706-886a-5703-950a-314cd64e0468" -Combinatorics = "861a8166-3701-5b0c-9a16-15d98fcdc6aa" -DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" -DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" DiskArrayTools = "fcd2136c-9f69-4db6-97e5-f31981721d63" DiskArrays = "3c3547ce-8d99-4f5e-a174-61eb10b00ae3" @@ -15,38 +12,25 @@ Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" FFTW = "7a1cc6ca-52ef-59f5-83cd-3a7055c09341" GeoInterface = "cf35fbd7-0cd7-5166-be24-54bfbe79505f" Interpolations = "a98d9a8b-a2ab-59e6-89dd-64a1c18fca59" -IntervalSets = "8197267c-284f-5f27-9208-e0e47529a953" -IterTools = "c8e1da08-722c-5040-9ed9-7db0dc04731e" Markdown = "d6f4376e-aef5-505a-96c1-9c027394607a" OnlineStats = "a15396b6-48d5-5d58-9928-6d29437db91e" Polynomials = "f27b6e38-b328-58d1-80ce-0feddd5e7a45" -ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca" -Requires = "ae029012-a4dd-5104-9daa-d747884805df" Shapefile = "8e980c4a-a4fe-5da2-b3a7-4b4b0353a2f4" Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2" StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91" Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c" WeightedOnlineStats = "bbac0a1f-7c9d-5672-960b-c6ca726e5d5d" -YAXArrayBase = "90b8fcef-0c2d-428d-9c56-5f86629e9d14" +YAXArrays = "c21b50f5-aa40-41ea-b809-c0f5e47bfa5c" +Zarr = "0a941bbe-ad1d-11e8-39d9-ab76183a1d99" [compat] -CFTime = "0.0, 0.1" -Combinatorics = "1" -DataFrames = "0.19, 0.20" -DataStructures = "0.17" -DiskArrays = "0.2" FFTW = "1" GeoInterface = "0.4, 0.5" -Interpolations = "0.12" -IntervalSets = "0.3, 0.4" -IterTools = "1" OnlineStats = "1" -Polynomials = "0.6" -ProgressMeter = "1" +Polynomials = "1" Shapefile = "0.6" -StatsBase = "0.32" +StatsBase = "0.32, 0.33" Tables = "0.2, 1.0" -WeightedOnlineStats = "0.3, 0.4" julia = "1.3" [extras] @@ -58,4 +42,4 @@ StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test", "CSV", "RDatasets", "StatsBase", "ESDC", "NetCDF"] +test = ["Test", "CSV", "RDatasets", "StatsBase", "NetCDF"] diff --git a/docs/src/analysis.md b/docs/src/analysis.md index 743cc5f5..cf7fb5b9 100644 --- a/docs/src/analysis.md +++ b/docs/src/analysis.md @@ -117,7 +117,7 @@ ESDL.DAT.cubefittable ### Online Histograms and quantiles -It is possible to estimate histograms and quantiles of larger-than-memory datasets using an adaptive-bin histogram algorithm. The `Base.quantile` method is overloaded for objects of type `AbstractCubeData`, so the following works to estimate the 10% and 90% quantiles of all datapoints for each variable: +It is possible to estimate histograms and quantiles of larger-than-memory datasets using an adaptive-bin histogram algorithm. The `Base.quantile` method is overloaded for objects of type `YAXarray`, so the following works to estimate the 10% and 90% quantiles of all datapoints for each variable: ```julia using WeightedOnlineStats diff --git a/docs/src/cube_access.md b/docs/src/cube_access.md index 09c53fc2..953be3f9 100644 --- a/docs/src/cube_access.md +++ b/docs/src/cube_access.md @@ -74,11 +74,8 @@ cubenew = extractLonLats(cubedata,ll) ## Cube Types While the `subsetcube` command returns an object of type `ZarrCube`, which represents a view into the ESDC, other cube operations will return different types of data cubes. -The returned type will depend on the size of the returned cube. If it is small enough to fit into memory, it will be a `CubeMem`, otherwise a `ZArrayCube`. All these types of data cubes share the same interface defined by [`ESDL.AbstractCubeData`](@ref), which means you can index them, do calculation using `mapCube` or plot them using the commands described in [Plotting](@ref). +The returned type will depend on the size of the returned cube. If it is small enough to fit into memory, it will be a `CubeMem`, otherwise a `ZArrayCube`. All these types of data cubes share the same interface defined by, which means you can index them, do calculation using `mapCube` or plot them using the commands described in [Plotting](@ref). -```@docs -ESDL.Cubes.AbstractCubeData -``` ```@docs ESDL.Cubes.CubeMem diff --git a/src/Cubes/Axes.jl b/src/Cubes/Axes.jl deleted file mode 100644 index ff6e4b12..00000000 --- a/src/Cubes/Axes.jl +++ /dev/null @@ -1,294 +0,0 @@ -module Axes -import ..Cubes: caxes, _read, Cubes, AbstractCubeData -using Dates -using Base.Iterators: take, drop - -""" - abstract CubeAxis{T} <: AbstractCubeData{T,1} - -Supertype of all axes. Every `CubeAxis` is an 1D Cube itself and can be passed -to mapCube operations. -""" -abstract type CubeAxis{T,S} <: AbstractCubeData{T,1} end - -Base.size(x::CubeAxis)=(length(x.values),) -Base.size(x::CubeAxis,i)=i==1 ? length(x.values) : error("Axis has only a single dimension") -Base.ndims(x::CubeAxis)=1 - - - -""" - CategoricalAxis{T,S} - -To represent axes that are categorical, where `T` is the element type. -The type parameter `S` denotes the axis name (a symbol). -The default constructor is: - - CategoricalAxis(axname::String,values::Vector{T}) - -""" -struct CategoricalAxis{T,S,RT} <: CubeAxis{T,S} - values::RT -end - -CategoricalAxis(s::Symbol,v)=CategoricalAxis{eltype(v),s,typeof(v)}(v) -CategoricalAxis(s::AbstractString,v)=CategoricalAxis(Symbol(s),v) - -struct RangeAxis{T,S,R<:AbstractVector{T}} <: CubeAxis{T,S} - values::R -end - -""" - RangeAxis{T,S,R} - -To represent axes that are categorical, where `T` is the element type. -The type parameter `S` denotes the axis name (a symbol) and `R` the type of the -range which is used to represent the axis values. -The default constructor is: - - RangeAxis(axname::String,values::Range{T}) - -""" -RangeAxis(s::Symbol,v::AbstractVector{T}) where T = RangeAxis{T,s,typeof(v)}(v) -RangeAxis(s::AbstractString,v)=RangeAxis(Symbol(s),v) - -Base.length(a::CubeAxis)=length(a.values) - -""" - axcopy(x,vals) - -Makes a copy of a `CubeAxis` with the values `vals` -""" -axcopy(ax::RangeAxis,vals) = RangeAxis(axname(ax),vals) -axcopy(ax::CategoricalAxis,vals) = CategoricalAxis(axname(ax),vals) - - -Base.show(io::IO,a::RangeAxis)=print(io,rpad(Axes.axname(a),20," "),"Axis with ",length(a)," Elements from ",first(a.values)," to ",last(a.values)) -function Base.show(io::IO,a::CategoricalAxis) - print(io,rpad(Axes.axname(a),20," "), "Axis with ", length(a), " elements: ") - if length(a.values)<10 - for v in a.values - print(io,v," ") - end - else - for v in take(a.values,2) - print(io,v," ") - end - print(io,".. ") - for v in drop(a.values,length(a.values)-2) - print(io,v," ") - end - end -end - -caxes(x::CubeAxis)=CubeAxis[x] - -axname(::Type{<:CubeAxis{<:Any,U}}) where U = string(U) -axname(::CubeAxis{<:Any,U}) where U = string(U) -axsym(::CubeAxis{<:Any,S}) where S = S - -get_step(r::AbstractRange)=step(r) -get_step(r::AbstractVector)=length(r)==0 ? zero(eltype(r)) : r[2]-r[1] - -axVal2Index_ub(a::RangeAxis, v; fuzzy=false)=axVal2Index(a,v-abshalf(get_step(a.values)),fuzzy=fuzzy) -axVal2Index_lb(a::RangeAxis, v; fuzzy=false)=axVal2Index(a,v+abshalf(get_step(a.values)),fuzzy=fuzzy) - -axVal2Index_ub(a::RangeAxis, v::Date; fuzzy=false)=axVal2Index(a,DateTime(v)-abshalf(get_step(a.values)),fuzzy=fuzzy) -axVal2Index_lb(a::RangeAxis, v::Date; fuzzy=false)=axVal2Index(a,DateTime(v)+abshalf(get_step(a.values)),fuzzy=fuzzy) - -abshalf(a) = abs(a/2) -abshalf(a::Day) = abs(Millisecond(a)/2) -abshalf(a::Month) = iseven(Dates.value(a)) ? a/2 : Month(a÷2) + Day(15) - -get_bb(ax::RangeAxis) = first(ax.values)-abshalf(get_step(ax.values)), last(ax.values)+abshalf(get_step(ax.values)) -function axisfrombb(name,bb,n) - offs = (bb[2]-bb[1])/(2*n) - RangeAxis(name,range(bb[1]+offs,bb[2]-offs,length=n)) -end - -function axVal2Index(a::RangeAxis{<:Any,<:Any,<:AbstractRange},v;fuzzy=false) - dt = v-first(a.values) - r = round(Int,dt/step(a.values))+1 - return max(1,min(length(a.values),r)) -end - -convert_time(T::Type{<:TimeType}, v::TimeType) = T(year(v), month(v), day(v), hour(v), minute(v), second(v)) -convert_time(T::Type{<:TimeType}, v::Date) = T(year(v), month(v), day(v), 0, 0, 0) -convert_time(::Type{Date},v::TimeType) = Date(year(v),month(v),day(v)) -convert_time(::Type{Date},v::Date) = Date(year(v),month(v),day(v)) - -function axVal2Index(a::RangeAxis{T},v;fuzzy=false) where T<:TimeType - vconverted = convert_time(T,v) - dd = map(i->abs((i-vconverted)),a.values) - mi,ind = findmin(dd) - return ind -end -function axVal2Index(a::RangeAxis{T,<:Any,<:AbstractRange},v;fuzzy=false) where T<:TimeType - vconverted = convert_time(T,v) - dd = map(i->abs((i-vconverted)),a.values) - mi,ind = findmin(dd) - return ind -end -function axVal2Index(axis::CategoricalAxis{String},v::String;fuzzy::Bool=false) - r=findfirst(isequal(v),axis.values) - if r===nothing - if fuzzy - r=findall(axis.values) do i - startswith(lowercase(i),lowercase(v[1:min(length(i),length(v))])) - end - if length(r)==1 - return(r[1]) - else - error("Could not find unique value of $v in $axis") - end - else - error("$v not found in $axis") - end - end - r -end -axVal2Index(x,v::CartesianIndex{1};fuzzy::Bool=false)=min(max(v.I[1],1),length(x)) -function axVal2Index(x,v;fuzzy::Bool=false) - i = findfirst(isequal(v),x.values) - isa(i,Nothing) && error("Value $v not found in x") - return i -end -abstract type AxisDescriptor end -struct ByName <: AxisDescriptor - name::String -end -struct ByInference <: AxisDescriptor end -struct ByType{T} <: AxisDescriptor - t::Type{T} -end -struct ByValue <: AxisDescriptor - v::CubeAxis -end -struct ByFunction <: AxisDescriptor - f::Function -end - -const VecOrTuple{S} = Union{Vector{<:S},Tuple{Vararg{<:S}}} where S - -#findAxis(a::Any,c::VecOrTuple{<:CubeAxis}) = findAxis(get_descriptor(desc),c) -get_descriptor(a::String)=ByName(a) -get_descriptor(a::Symbol)=ByName(String(a)) -get_descriptor(a::Type{T}) where {T<:CubeAxis}=ByType(a) -get_descriptor(a::CubeAxis)=ByValue(a) -get_descriptor(a::Function)=ByFunction(a) -get_descriptor(a)=error("$a is not a valid axis description") -get_descriptor(a::AxisDescriptor)=a - - - -"Find a certain axis type in a vector of Cube axes and returns the index" -function findAxis(bt::ByType,v::VecOrTuple{CubeAxis}) - a=bt.t - for i=1:length(v) - isa(v[i],a) && return i - end - return nothing -end -function findAxis(bs::ByName,axlist::VecOrTuple{CubeAxis}) - matchstr=bs.name - ism=findall(i->startswith(lowercase(axname(i)),lowercase(matchstr)),axlist) - isempty(ism) && return nothing - if length(ism)>1 - f = axlist[ism[1]] - all(i->i==f,ism) || error("Multiple axes found matching string $matchstr") - return f - else - return ism[1] - end -end -function findAxis(bv::ByValue,axlist::VecOrTuple{CubeAxis}) - v=bv.v - return findfirst(i->i==v,axlist) -end -function getAxis(desc,axlist::VecOrTuple{CubeAxis}) - i = findAxis(desc,axlist) - if isa(i,Nothing) - return nothing - else - return axlist[i] - end -end -getOutAxis(desc,axlist,incubes,pargs,f) = getAxis(desc,unique(axlist)) -function getOutAxis(desc::ByFunction,axlist,incubes,pargs,f) - outax = desc.f(incubes,pargs) - isa(outax,CubeAxis) || error("Axis Generation function $(desc.f) did not return an axis") - outax -end -import DataStructures: counter -function getOutAxis(desc::Tuple{ByInference},axlist,incubes,pargs,f) - inAxes = map(caxes,incubes) - inAxSmall = map(i->filter(j->in(j,axlist),i) |>collect,inAxes) - inSizes = map(i->(map(length,i)...,),inAxSmall) - intypes = map(eltype, incubes) - testars = map((s,it)->zeros(it,s...),inSizes, intypes) - map(testars) do ta - ta .= rand(Base.nonmissingtype(eltype(ta)),size(ta)...) - if eltype(ta) >: Missing - # Add some missings - randind = rand(1:length(ta),length(ta)÷10) - ta[randind] .= missing - end - end - resu = f(testars...,pargs...) - isa(resu,AbstractArray) || isa(resu,Number) || isa(resu,Missing) || error("Function must return an array or a number") - (isa(resu,Number) || isa(resu,Missing)) && return () - outsizes = size(resu) - outaxes = map(outsizes,1:length(outsizes)) do s,il - if s>2 - i = findall(i->i==s,length.(axlist)) - if length(i)==1 - return axlist[i[1]] - elseif length(i)>1 - @info "Found multiple matching axes for output dimension $il" - end - end - return RangeAxis("OutAxis$(il)",1:s) - end - if !allunique(outaxes) - #TODO: fallback with axis renaming in this case - error("Could not determine unique output axes from output shape") - end - return (outaxes...,) -end -""" - getAxis(desc::String, c::AbstractCubeData) - -Given the string of an axis name and a cube, returns this axis of the cube. -""" -getAxis(desc,c)=getAxis(desc,caxes(c)) -getAxis(desc::ByValue,axlist::Vector{T}) where {T<:CubeAxis}=desc.v - -"Fallback method" -findAxis(desc,c)=findAxis(desc,caxes(c)) -findAxis(a,axlist::VecOrTuple{CubeAxis}) = findAxis(get_descriptor(a),axlist) - -getSubRange(x::CubeAxis,i)=x.values[i],nothing - -renameaxis(r::RangeAxis{T,<:Any,V}, newname) where {T,V} = RangeAxis{T,Symbol(newname),V}(r.values) -renameaxis(r::CategoricalAxis{T,<:Any,V}, newname) where {T,V} = CategoricalAxis{T,Symbol(newname),V}(r.values) - -function _read(ax::CubeAxis, ar::AbstractArray, I::CartesianIndices) - ar[:] .= ax.values[I.indices[1]] -end - -macro caxis_str(s) - :(CategoricalAxis{String,$(QuoteNode(Symbol(s)))}) -end - -import Base.== -import Base.isequal -==(a::CubeAxis,b::CubeAxis)=(a.values==b.values) && (axname(a)==axname(b)) -isequal(a::CubeAxis, b::CubeAxis) = a==b - -using YAXArrayBase: YAXArrayBase -#Implement yaxarray interface -YAXArrayBase.dimname(x::CubeAxis,_) = axname(x) -YAXArrayBase.dimvals(x::CubeAxis,_) = x.values -YAXArrayBase.iscontdim(::RangeAxis,_) = true -YAXArrayBase.iscontdim(::CategoricalAxis,_) = false -end diff --git a/src/Cubes/Cubes.jl b/src/Cubes/Cubes.jl deleted file mode 100644 index 462d1c3c..00000000 --- a/src/Cubes/Cubes.jl +++ /dev/null @@ -1,331 +0,0 @@ -""" -The functions provided by ESDL are supposed to work on different types of cubes. This module defines the interface for all -Data types that -""" -module Cubes -using DiskArrays: DiskArrays, eachchunk -using Distributed: myid -using Dates: TimeType -using IntervalSets: Interval, (..) -using Base.Iterators: take, drop -using ..ESDL: workdir, ESDLDefaults -using YAXArrayBase -import YAXArrayBase: iscompressed, getattributes - -""" - AbstractCubeData{T,N} - -Supertype of all cubes. `T` is the data type of the cube and `N` the number of -dimensions. Beware that an `AbstractCubeData` does not implement the `AbstractArray` -interface. However, the `ESDL` functions [`mapCube`](@ref), [`reduceCube`](@ref), -[`readcubedata`](@ref), [`plotMAP`](@ref) and [`plotXY`](@ref) will work on any subtype -of `AbstractCubeData` -""" -abstract type AbstractCubeData{T,N} end - -Base.eltype(::AbstractCubeData{T}) where T = T -Base.ndims(::AbstractCubeData{<:Any,N}) where N = N - -""" - readCubeData(cube::AbstractCubeData) - -Given any type of `AbstractCubeData` returns a [`CubeMem`](@ref) from it. -""" -function readcubedata(x) - s=size(x) - aout = zeros(eltype(x),s...) - r=CartesianIndices(s) - _read(x,aout,r) - CubeMem(collect(CubeAxis,caxes(x)),aout,getattributes(x)) -end - -""" -This function calculates a subset of a cube's data -""" -function subsetcube end - -function _read end - -"Returns the axes of a Cube" -caxes(c::AbstractCubeData)=error("Axes function not implemented for $(typeof(c))") - -getattributes(::AbstractCubeData)=Dict{String,Any}() - -"Chunks, if given" -cubechunks(c::AbstractCubeData) = (size(c,1),map(i->1,2:ndims(c))...) - -"Offset of the first chunk" -chunkoffset(c::AbstractCubeData) = ntuple(i->0,ndims(c)) - -function iscompressed end - -"Supertype of all subtypes of the original data cube" -abstract type AbstractSubCube{T,N} <: AbstractCubeData{T,N} end - - -"Supertype of all in-memory representations of a data cube" -abstract type AbstractCubeMem{T,N} <: AbstractCubeData{T,N} end - -include("Axes.jl") -using .Axes: CubeAxis, RangeAxis, CategoricalAxis, findAxis, getAxis, axVal2Index, - axname, axsym, axVal2Index_lb, axVal2Index_ub, renameaxis, axcopy - -mutable struct CleanMe - path::String - persist::Bool - function CleanMe(path::String,persist::Bool) - c = new(path,persist) - finalizer(clean,c) - c - end -end -function clean(c::CleanMe) - if !c.persist && myid()==1 - if !isdir(c.path) - @warn "Cube directory $(c.path) does not exist. Can not clean" - else - rm(c.path,recursive=true) - end - end -end - -""" - ESDLArray{T,N} <: AbstractCubeMem{T,N} - -An in-memory data cube. It is returned by applying `mapCube` when -the output cube is small enough to fit in memory or by explicitly calling -[`readcubedata`](@ref) on any type of cube. - -### Fields - -* `axes` a `Vector{CubeAxis}` containing the Axes of the Cube -* `data` N-D array containing the data - -""" -struct ESDLArray{T,N,A<:AbstractArray{T,N},AT} <: AbstractCubeData{T,N} - axes::AT - data::A - properties::Dict{String} - cleaner::Vector{CleanMe} -end - -ESDLArray(axes,data,properties=Dict{String,Any}(); cleaner=CleanMe[]) = ESDLArray(axes,data,properties, cleaner) -function ESDLArray(x::AbstractArray) - ax = caxes(x) - props = getattributes(x) - ESDLArray(ax,x,props) -end -Base.size(a::ESDLArray) = size(a.data) -Base.size(a::ESDLArray,i::Int) = size(a.data,i) -Base.permutedims(c::ESDLArray,p)=ESDLArray(c.axes[collect(p)],permutedims(c.data,p),c.properties,c.cleaner) -caxes(c::ESDLArray)=c.axes -function caxes(x) - map(enumerate(dimnames(x))) do a - i,s = a - v = dimvals(x,i) - iscontdim(x,i) ? RangeAxis(s,v) : CategoricalAxis(s,v) - end -end -iscompressed(c::ESDLArray)=iscompressed(c.data) -iscompressed(c::DiskArrays.PermutedDiskArray) = iscompressed(c.a.parent) -iscompressed(c::DiskArrays.SubDiskArray) = iscompressed(c.v.parent) -cubechunks(c::ESDLArray)=common_size(eachchunk(c.data)) -cubechunks(x) = common_size(eachchunk(x)) -common_size(a::DiskArrays.GridChunks) = a.chunksize -function common_size(a) - ntuple(ndims(a)) do idim - otherdims = setdiff(1:ndims(a),idim) - allengths = map(i->length(i.indices[idim]),a) - for od in otherdims - allengths = unique(allengths,dims=od) - end - @assert length(allengths) == size(a,idim) - length(allengths)<3 ? allengths[1] : allengths[2] - end -end - -Base.getindex(x::ESDLArray, i...) = x.data[i...] -chunkoffset(c::ESDLArray)=common_offset(eachchunk(c.data)) -chunkoffset(x) = common_offset(eachchunk(x)) -common_offset(a::DiskArrays.GridChunks) = a.offset -function common_offset(a) - ntuple(ndims(a)) do idim - otherdims = setdiff(1:ndims(a),idim) - allengths = map(i->length(i[idim]),a) - for od in otherdims - allengths = unique(allengths,dims=od) - end - @assert length(allengths) == size(a,idim) - length(allengths)<3 ? 0 : allengths[2]-allengths[1] - end -end -readcubedata(c::ESDLArray)=ESDLArray(c.axes,Array(c.data),c.properties,CleanMe[]) - -function renameaxis!(c::ESDLArray,p::Pair) - i = findAxis(p[1],c.axes) - c.axes[i]=renameaxis(c.axes[i],p[2]) - c -end -function renameaxis!(c::ESDLArray,p::Pair{<:Any,<:CubeAxis}) - i = findAxis(p[1],c.axes) - i === nothing && throw(ArgumentError("Axis not found")) - length(c.axes[i].values) == length(p[2].values) || throw(ArgumentError("Length of replacement axis must equal length of old axis")) - c.axes[i]=p[2] - c -end - -# function getSubRange(c::AbstractArray,i...;write::Bool=true) -# length(i)==ndims(c) || error("Wrong number of view arguments to getSubRange. Cube is: $c \n indices are $i") -# return view(c,i...) -# end -# getSubRange(c::Tuple{AbstractArray{T,0},AbstractArray{UInt8,0}};write::Bool=true) where {T}=c - -function _subsetcube end - -function subsetcube(z::ESDLArray{T};kwargs...) where T - newaxes, substuple = _subsetcube(z,collect(Any,map(Base.OneTo,size(z)));kwargs...) - newdata = view(z.data,substuple...) - ESDLArray(newaxes,newdata,z.properties,cleaner = z.cleaner) -end - -sorted(x,y) = xsubs[i],length(subs)) - inewaxes = findall(i->isa(i,AbstractVector),substuple) - newaxes = newaxes[inewaxes] - @assert length.(newaxes) == map(length,filter(i->isa(i,AbstractVector),collect(substuple))) - newaxes, substuple -end - -include(joinpath(@__DIR__,"../DatasetAPI/countrydict.jl")) - -Base.getindex(a::AbstractCubeData;kwargs...) = subsetcube(a;kwargs...) - -Base.read(d::AbstractCubeData) = getindex(d,fill(Colon(),ndims(d))...) - -function formatbytes(x) - exts=["bytes","KB","MB","GB","TB"] - i=1 - while x>=1024 - i=i+1 - x=x/1024 - end - return string(round(x, digits=2)," ",exts[i]) -end -cubesize(c::AbstractCubeData{T}) where {T}=(sizeof(T)+1)*prod(map(length,caxes(c))) -cubesize(c::AbstractCubeData{T,0}) where {T}=sizeof(T)+1 - -getCubeDes(::CubeAxis)="Cube axis" -getCubeDes(::ESDLArray)="ESDL data cube" -getCubeDes(::Type{T}) where T = string(T) -Base.show(io::IO,c::AbstractCubeData) = show_yax(io,c) - -function show_yax(io::IO,c) - println(io,getCubeDes(c), " with the following dimensions") - for a in caxes(c) - println(io,a) - end - - foreach(getattributes(c)) do p - if p[1] in ("labels","name","units") - println(io,p[1],": ",p[2]) - end - end - println(io,"Total size: ",formatbytes(cubesize(c))) -end - - -using Markdown -struct ESDLVarInfo - project::String - longname::String - units::String - url::String - comment::String - reference::String -end -Base.isless(a::ESDLVarInfo, b::ESDLVarInfo) = isless(string(a.project, a.longname),string(b.project, b.longname)) - -import Base.show -function show(io::IO,::MIME"text/markdown",v::ESDLVarInfo) - un=v.units - url=v.url - re=v.reference - pr = v.project - ln = v.longname - co = v.comment - mdt=md""" -### $ln -*$(co)* - -* **Project** $(pr) -* **units** $(un) -* **Link** $(url) -* **Reference** $(re) -""" - mdt[3].items[1][1].content[3]=[" $pr"] - mdt[3].items[2][1].content[3]=[" $un"] - mdt[3].items[3][1].content[3]=[" $url"] - mdt[3].items[4][1].content[3]=[" $re"] - show(io,MIME"text/markdown"(),mdt) -end -show(io::IO,::MIME"text/markdown",v::Vector{ESDLVarInfo})=foreach(x->show(io,MIME"text/markdown"(),x),v) - -""" - cubeinfo(cube) - -Shows the metadata and citation information on variables contained in a cube. -""" -function cubeinfo(ds::ESDLArray, variable="unknown") - p = ds.properties - vi=ESDLVarInfo( - get(p,"project_name", "unknown"), - get(p,"long_name",variable), - get(p,"units","unknown"), - get(p,"url","no link"), - get(p,"comment",variable), - get(p,"references","no reference") - ) -end - -include("TransformedCubes.jl") -include("Slices.jl") -end diff --git a/src/Cubes/Slices.jl b/src/Cubes/Slices.jl deleted file mode 100644 index 84774b1d..00000000 --- a/src/Cubes/Slices.jl +++ /dev/null @@ -1,39 +0,0 @@ -import Base: / -using ..ESDLTools: PickAxisArray -struct YAXSlice{T,N,P} - c::P - sliceaxes - otheraxes -end -function YAXSlice(p,dims) - N = ndims(p) - ax = caxes(p) - isliceax = (findAxis.(dims,Ref(ax))...,) - any(isnothing,isliceax) && error("Axis not in cube") - iotherax = (filter(i->!in(i,isliceax),1:N)...,) - YAXSlice{Array{eltype(p),length(dims)},N-length(dims),typeof(p)}(p,(isliceax,getindex.(Ref(ax),isliceax)), (iotherax,getindex.(Ref(ax),iotherax))) -end -/(c::ESDLArray, s::Union{String,Symbol}) = YAXSlice(c,(s,)) -/(c::ESDLArray, s) = YAXSlice(c,s) - -dimvals(x::YAXSlice, i) = x.c.axes[x.otheraxes[1][i]].values - -function dimname(x::YAXSlice, i) - axsym(x.otheraxes[2][i]) -end - -getattributes(x::YAXSlice) = x.c.properties - -iscontdim(x::YAXSlice, i) = isa(x.c.axes[x.otheraxes[1][i]], RangeAxis) - -function getdata(x::YAXSlice) - m = fill!(Array{Union{Colon,Bool}}(undef,ndims(x.c)),true) - m[collect(x.sliceaxes[1])] .= Colon() - PickAxisArray(x.c.data, m) -end - -getCubeDes(s::YAXSlice) = string(join(axname.(s.sliceaxes[2]), " x "), " slices over an ", getCubeDes(s.c)) -cubesize(s::YAXSlice) = cubesize(s.c) -Base.ndims(s::YAXSlice{<:Any,N}) where N = N - -Base.show(io::IO,s::YAXSlice) = ESDL.Cubes.show_yax(io,s) diff --git a/src/Cubes/TransformedCubes.jl b/src/Cubes/TransformedCubes.jl deleted file mode 100644 index e4b34300..00000000 --- a/src/Cubes/TransformedCubes.jl +++ /dev/null @@ -1,43 +0,0 @@ -export ConcatCube, concatenateCubes -export mergeAxes -import ..Cubes: ESDLArray, caxes, iscompressed, cubechunks, chunkoffset -using DiskArrayTools: diskstack - -function Base.map(op, incubes::ESDLArray...) - axlist=copy(caxes(incubes[1])) - all(i->caxes(i)==axlist,incubes) || error("All axes must match") - props=merge(getattributes.(incubes)...) - ESDLArray(axlist,broadcast(op,map(c->c.data,incubes)...),props,map(i->i.cleaner,incubes)) -end - -""" - function concatenateCubes(cubelist, cataxis::CategoricalAxis) - -Concatenates a vector of datacubes that have identical axes to a new single cube along the new -axis `cataxis` -""" -function concatenateCubes(cl,cataxis::CubeAxis) - length(cataxis.values)==length(cl) || error("cataxis must have same length as cube list") - axlist=copy(caxes(cl[1])) - T=eltype(cl[1]) - N=ndims(cl[1]) - cleaners = CleanMe[] - append!(cleaners,cl[1].cleaner) - for i=2:length(cl) - all(caxes(cl[i]).==axlist) || error("All cubes must have the same axes, cube number $i does not match") - eltype(cl[i])==T || error("All cubes must have the same element type, cube number $i does not match") - ndims(cl[i])==N || error("All cubes must have the same dimension") - append!(cleaners,cl[i].cleaner) - end - props=mapreduce(getattributes,merge,cl,init=getattributes(cl[1])) - ESDLArray([axlist...,cataxis],diskstack([c.data for c in cl]),props, cleaners) -end -function concatenateCubes(;kwargs...) - cubenames = String[] - for (n,c) in kwargs - push!(cubenames,string(n)) - end - cubes = map(i->i[2],collect(kwargs)) - findAxis("Variable",cubes[1]) === nothing || error("Input cubes must not contain a variable kwarg concatenation") - concatenateCubes(cubes, CategoricalAxis("Variable",cubenames)) -end diff --git a/src/DAT/DAT.jl b/src/DAT/DAT.jl deleted file mode 100644 index 13094e45..00000000 --- a/src/DAT/DAT.jl +++ /dev/null @@ -1,747 +0,0 @@ -module DAT -export mapCube -import ..Cubes -using ..ESDLTools -using Distributed: RemoteChannel, nworkers,pmap, - @everywhere, workers, remotecall_fetch, - remote_do, myid, nprocs -import ..Cubes: cubechunks, iscompressed, chunkoffset, - CubeAxis, AbstractCubeData, ESDLArray, - caxes, YAXSlice -import ..Cubes.Axes: AxisDescriptor, axname, ByInference, axsym, - getOutAxis, getAxis, findAxis -import ..Datasets: Dataset, createdataset -import ...ESDL -import ...ESDL.workdir -import ProgressMeter: Progress, next!, progress_pmap -using YAXArrayBase -using Dates -global const debugDAT=[false] -#TODO use a logging package -macro debug_print(e) - debugDAT[1] && return(:(println($e))) - :() -end - -include("registration.jl") - -""" -Internal representation of an input cube for DAT operations -""" -mutable struct InputCube{N} - cube::Any #The input data cube - desc::InDims #The input description given by the user/registration - axesSmall::Vector{CubeAxis} #List of axes that were actually selected through the desciption - icolon::Vector{Int} - colonperm::Union{Vector{Int},Nothing} - loopinds::Vector{Int} #Indices of loop axes that this cube does not contain, i.e. broadcasts - cachesize::Vector{Int} #Number of elements to keep in cache along each axis -end - -function InputCube(c, desc::InDims) - iaxessmall = findAxis.(desc.axisdesc,Ref(c)) - axlist = caxes(c) - axesSmall = map(i->axlist[i], iaxessmall) - colonperm = issorted(iaxessmall) ? nothing : collect(Base.invperm(sortperm(collect(iaxessmall)))) - iaxessmall = collect(iaxessmall) - any(isequal(nothing),axesSmall) && error("One of the input axes not found in input cubes") - InputCube{ndims(c)}(c,desc,collect(CubeAxis,axesSmall),iaxessmall,colonperm,Int[],Int[]) -end -geticolon(ic::InputCube) = ic.icolon -createworkarrays(T,s,ntr)=[Array{T}(undef,s...) for i=1:ntr] - - - - -""" -Internal representation of an output cube for DAT operations -""" -mutable struct OutputCube - cube::Any #The actual outcube cube, once it is generated - desc::OutDims #The description of the output axes as given by users or registration - axesSmall::Array{CubeAxis} #The list of output axes determined through the description - allAxes::Vector{CubeAxis} #List of all the axes of the cube - loopinds::Vector{Int} #Index of the loop axes that are broadcasted for this output cube - innerchunks - outtype -end - -const InOutCube = Union{InputCube,OutputCube} - -getsmallax(c::InOutCube)=c.axesSmall -geticolon(c::OutputCube)=1:length(c.axesSmall) -getAxis(desc,c::InOutCube) = getAxis(desc,c.cube) - -function getworkarray(c::InOutCube,ntr) - wa = createworkarrays(eltype(c.cube),length.(c.axesSmall),ntr) - map(wa) do w - wrapWorkArray(c.desc.artype,w,c.axesSmall) - end -end - -function interpretoutchunksizes(desc,axesSmall,incubes) - if desc.chunksize == :max - map(ax->axname(ax)=>length(ax),axesSmall) - elseif desc.chunksize == :input - map(axesSmall) do ax - for cc in incubes - i = findAxis(axname(ax),cc) - if i !== nothing - return axname(ax)=>min(length(ax),cubechunks(cc)[i]) - end - end - return axname(ax)=>length(ax) - end - else - desc.chunksize - end -end - -getOutAxis(desc::Tuple,inAxes,incubes,pargs,f)=map(i->getOutAxis(i,inAxes,incubes,pargs,f),desc) -function OutputCube(desc::OutDims,inAxes::Vector{CubeAxis},incubes,pargs,f) - axesSmall = getOutAxis(desc.axisdesc,inAxes,incubes,pargs,f) - outtype = getOuttype(desc.outtype,incubes) - innerchunks = interpretoutchunksizes(desc,axesSmall,incubes) - OutputCube(nothing,desc,collect(CubeAxis,axesSmall),CubeAxis[],Int[],innerchunks,outtype) -end - -""" -Configuration object of a DAT process. This holds all necessary information to perform the calculations. -It contains the following fields: - -- `incubes::NTuple{NIN,InputCube}` The input data cubes -- `outcube::NTuple{NOUT,OutputCube}` The output data cubes -allInAxes :: Vector -LoopAxes :: Vector -ispar :: Bool -loopcachesize :: Vector{Int} -max_cache -fu -inplace :: Bool -include_loopvars:: Bool -ntr -addargs -kwargs -""" -mutable struct DATConfig{NIN,NOUT} - "The input data cubes" - incubes :: NTuple{NIN,InputCube} - - outcubes :: NTuple{NOUT,OutputCube} - allInAxes :: Vector - LoopAxes :: Vector - ispar :: Bool - loopcachesize :: Vector{Int} - max_cache - fu - inplace :: Bool - include_loopvars:: Bool - ntr - addargs - kwargs -end -function DATConfig(cdata,indims,outdims,inplace,max_cache,fu,ispar,include_loopvars,nthreads,addargs,kwargs) - - isa(indims,InDims) && (indims=(indims,)) - isa(outdims,OutDims) && (outdims=(outdims,)) - length(cdata)==length(indims) || error("Number of input cubes ($(length(cdata))) differs from registration ($(length(indims)))") - incubes = ([InputCube(o[1],o[2]) for o in zip(cdata,indims)]...,) - allInAxes = vcat([ic.axesSmall for ic in incubes]...) - outcubes = ((map(1:length(outdims),outdims) do i,desc - OutputCube(desc,allInAxes,cdata,addargs,fu) - end)...,) - - DATConfig( - incubes, - outcubes, - allInAxes, - CubeAxis[], # LoopAxes - ispar, - Int[], - max_cache, # max_cache - fu, # fu # loopcachesize - inplace, # inplace - include_loopvars, - nthreads, - addargs, # addargs - kwargs - ) -end - - -getOuttype(outtype::Int,cdata)=eltype(cdata[outtype]) -function getOuttype(outtype::DataType,cdata) - outtype -end - -mapCube(fu::Function,cdata,addargs...;kwargs...)=mapCube(fu,(cdata,),addargs...;kwargs...) - -function mapCube(f, in_ds::Dataset, addargs...; indims=InDims(), outdims=OutDims(), inplace=true, kwargs...) - allars = values(in_ds.cubes) - allaxes = collect(values(in_ds.axes)) - arnames = keys(in_ds.cubes) - sarnames = (arnames...,) - any(ad->findAxis(ad,allaxes)===nothing,indims.axisdesc) && error("One of the Dimensions does not exist in Dataset") - - idar = collect(indims.axisdesc) - allindims = map(allars) do c - idshort = filter(idar) do ad - findAxis(ad,c)!==nothing - end - InDims((idshort...,), indims.artype, indims.procfilter) - end - isa(outdims, OutDims) || error("Only one output cube currently supported for datasets") - isempty(addargs) || error("Additional arguments currently not supported for datasets, use kwargs instead") - if inplace - fnew = let arnames=collect(arnames), f=f - function dsfun(xout,xin...; kwargs...) - incubes = NamedTuple{sarnames, typeof(xin)}(xin) - f(xout,incubes; kwargs...) - end - end - else - fnew = let arnames=collect(arnames), f=f - function dsfun(xin...; kwargs...) - incubes = NamedTuple{sarnames, typeof(xin)}(xin) - f(incubes; kwargs...) - end - end - end - allcubes = collect(values(in_ds.cubes)) - mapCube(fnew,(allcubes...,);indims=allindims,outdims=outdims, inplace=inplace, kwargs...) -end - -import Base.mapslices -function mapslices(f,d::Union{AbstractCubeData, Dataset},addargs...;dims,kwargs...) - isa(dims,String) && (dims=(dims,)) - mapCube(f,d,addargs...;indims = InDims(dims...),outdims = OutDims(ByInference()),inplace=false,kwargs...) -end - - - -""" - mapCube(fun, cube, addargs...;kwargs) - -Map a given function `fun` over slices of the data cube `cube`. - -### Keyword arguments - -* `max_cache=1e7` maximum size of blocks that are read into memory, defaults to approx 10Mb -* `indims::InDims` List of input cube descriptors of type [`InDims`](@ref) for each input data cube -* `outdims::OutDims` List of output cube descriptors of type [`OutDims`](@ref) for each output cube -* `inplace` does the function write to an output array inplace or return a single value> defaults to `true` -* `ispar` boolean to determine if parallelisation should be applied, defaults to `true` if workers are available. -* `showprog` boolean indicating if a ProgressMeter shall be shown -* `include_loopvars` boolean to indicate if the varoables looped over should be added as function arguments -* `loopchunksize` determines the chunk sizes of variables which are looped over, a dict -* `kwargs` additional keyword arguments passed to the inner function - -The first argument is always the function to be applied, the second is the input cube or -a tuple input cubes if needed. -""" -function mapCube(fu::Function, - cdata::Tuple,addargs...; - max_cache=ESDL.ESDLDefaults.max_cache[], - indims=InDims(), - outdims=OutDims(), - inplace=true, - ispar=nprocs()>1, - debug=false, - include_loopvars=false, - showprog=true, - nthreads=ispar ? Dict(i=>remotecall_fetch(Threads.nthreads,i) for i in workers()) : [Threads.nthreads()] , - loopchunksize = Dict(), - kwargs...) - - #Translate slices - if any(i->isa(i,YAXSlice),cdata) - inew = map(cdata) do d - isa(d,YAXSlice) ? InDims(axname.(d.sliceaxes[2])...) : InDims() - end - cnew = map(i->isa(i,YAXSlice) ? i.c : i, cdata) - return mapCube(fu, cnew, addargs...; - indims=inew, outdims=outdims, inplace=inplace, ispar=ispar, - debug=debug, include_loopvars=include_loopvars, showprog=showprog, - nthreads=nthreads, kwargs...) - end - @debug_print "Generating DATConfig" - dc=DATConfig(cdata,indims,outdims,inplace, - max_cache,fu,ispar,include_loopvars,nthreads,addargs,kwargs) - @debug_print "Analysing Axes" - analyzeAxes(dc) - @debug_print "Calculating Cache Sizes" - getCacheSizes(dc,loopchunksize) - @debug_print "Generating Output Cube" - generateOutCubes(dc) - @debug_print "Running main Loop" - debug && return(dc) - runLoop(dc,showprog) - @debug_print "Finalizing Output Cube" - - if length(dc.outcubes)==1 - return dc.outcubes[1].cube - else - return (map(i->i.cube,dc.outcubes)...,) - end - -end - -function makeinplace(f) - (args...;kwargs...)->begin - first(args).=f(Base.tail(args)...;kwargs...) - nothing - end -end - - -function getchunkoffsets(dc::DATConfig) - co = zeros(Int,length(dc.LoopAxes)) - lc = dc.loopcachesize - for ic in dc.incubes - for (ax,cocur,cs) in zip(caxes(ic.cube),chunkoffset(ic.cube),cubechunks(ic.cube)) - ii = findAxis(ax,dc.LoopAxes) - if !isa(ii,Nothing) && iszero(co[ii]) && cocur>0 && mod(lc[ii],cs)==0 - co[ii]=cocur - end - end - end - (co...,) -end - -updatears(clist,r,f,caches) = foreach(clist, caches) do ic, ca - updatear(f,r, ic.cube,geticolon(ic),ic.loopinds, ca ) -end -getindsall(indscol, loopinds, rfunc, colfunc = _->Colon()) = getindsall((),1,(sort(indscol)...,),(loopinds...,),rfunc,colfunc) -function getindsall(indsall,inow,indscol,loopinds,rfunc,colfunc) - if !isempty(indscol) && first(indscol) == inow - getindsall((indsall..., colfunc(inow)),inow+1,Base.tail(indscol),loopinds,rfunc,colfunc) - else - getindsall((indsall..., rfunc(first(loopinds))),inow+1,indscol,Base.tail(loopinds),rfunc,colfunc) - end -end -getindsall(indsall,inow,::Tuple{},::Tuple{},r,c) = indsall - - -function updatear(f,r,cube,indscol,loopinds,cache) - indsall = getindsall(indscol,loopinds,i->r[i]) - l2 = map((i,s)->isa(i,Colon) ? s : length(i),indsall,size(cache)) - if size(cache) != l2 - hinds = map((i,s)->isa(i,Colon) ? (1:s) : 1:length(i),indsall,size(cache)) - if f == :read - cache[hinds...] = getdata(cube)[indsall...] - else - getdata(cube)[indsall...] = cache[hinds...] - end - else - if f == :read - d = getdata(cube)[indsall...] - cache[:] = d - else - getdata(cube)[indsall...] = cache - end - end -end -updateinars(dc,r,incaches)=updatears(dc.incubes,r,:read,incaches) -writeoutars(dc,r, outcaches)=updatears(dc.outcubes,r,:write,outcaches) - -function loopworker(dcchan::RemoteChannel, ranchan, reschan) - loopworker(take!(dcchan), ranchan, reschan) -end -function loopworker(dc::DATConfig, ranchan, reschan) - incaches, outcaches, args = try - getallargs(dc) - catch e - println("Error during initialization: ", e) - put!(reschan, e) - end - try - loopworker(dc,ranchan, reschan, incaches, outcaches, args) - catch e - if e isa InvalidStateException - return nothing - else - println("Error during running loop: ", e) - put!(reschan, e) - end - end -end - -function loopworker(dc,ranchan, reschan, incaches, outcaches, args) - while true - r = take!(ranchan) - updateinars(dc,r,incaches) - innerLoop(r,args...) - writeoutars(dc,r,outcaches) - @async put!(reschan,r) - end -end - -function runLoop(dc::DATConfig,showprog) - allRanges=distributeLoopRanges((dc.loopcachesize...,),(map(length,dc.LoopAxes)...,),getchunkoffsets(dc)) - chnlfunc() = Channel{eltype(allRanges)}(length(allRanges)) - outchanfunc() = Channel(length(allRanges)) - inchan, outchan, dcpass = if dc.ispar - dcpass = RemoteChannel(()->Channel{DATConfig}(nworkers())) - for i=1:nworkers() - put!(dcpass, dc) - end - RemoteChannel(chnlfunc), RemoteChannel(outchanfunc), dcpass - else - chnlfunc(), outchanfunc(), dc - end - #Now distribute the jobs - for r in allRanges - put!(inchan,r) - end - #And start the workers - if dc.ispar - for p in workers() - remote_do(ESDL.DAT.loopworker, p, dcpass, inchan, outchan) - end - else - @async loopworker(dc, inchan, outchan) - end - showprog && (pm=Progress(length(allRanges))) - for i in 1:length(allRanges) - r = take!(outchan) - if isa(r,Exception) - throw(r) - end - showprog && next!(pm) - end - close(inchan) - dc.outcubes -end - -abstract type AxValCreator end -struct NoLoopAxes <:AxValCreator end -struct AllLoopAxes{S,V} <:AxValCreator - loopsyms::S - loopaxvals::V -end -AllLoopAxes(a) = AllLoopAxes(map(axsym,a), map(i->i.values,a)) -getlaxvals(::NoLoopAxes,cI,offscur) = () -getlaxvals(a::AllLoopAxes,cI,offscur) = (NamedTuple{a.loopsyms}(map((ax,i,of)->(i+of,ax[i+of]),a.loopaxvals,cI.I, offscur)),) - - -function getallargs(dc::DATConfig) - incache, outcache = getCubeCache(dc) - filters = map(ic->ic.desc.procfilter,dc.incubes) - inworkar, outworkar = generateworkarrays(dc) - axvals = if dc.include_loopvars - lax = (dc.LoopAxes...,) - AllLoopAxes(lax) - else - NoLoopAxes() - end - adda = dc.addargs - kwa = dc.kwargs - fu = if !dc.inplace - makeinplace(dc.fu) - else - dc.fu - end - inarsbc = map(dc.incubes, incache) do ic, cache - allax = getindsall(geticolon(ic), ic.loopinds, i->true) - if ic.colonperm === nothing - pa = PickAxisArray(cache,allax) - else - pa = PickAxisArray(cache,allax,ic.colonperm) - end - end - outarsbc = map(dc.outcubes, outcache) do oc, cache - allax = getindsall(1:length(oc.axesSmall), oc.loopinds, i->true) - pa = PickAxisArray(cache,allax) - pa - end - incache, outcache, (fu,inarsbc,outarsbc,filters,inworkar,outworkar, - axvals,adda,kwa) -end - - -function getbackend(oc,ispar,max_cache) - eltype=Union{oc.outtype,Missing} - outsize=sizeof(eltype)*(length(oc.allAxes)>0 ? prod(map(length,oc.allAxes)) : 1) - rt = oc.desc.backend - if rt == :auto - if ispar[] || outsize>max_cache - rt = :zarr - else - rt = :array - end - end - b = YAXArrayBase.backendlist[Symbol(rt)] - if !allow_parallel_write(b) - ispar[] = false - end - eltype,b -end - -function generateOutCube(::Type{T},eltype,oc::OutputCube,loopcachesize,co;kwargs...) where T - cs_inner = (map(length,oc.axesSmall)...,) - cs = (cs_inner..., loopcachesize...) - for (i,cc) in enumerate(oc.innerchunks) - if cc !== nothing - cs = Base.setindex(cs,cc,i) - end - end - oc.cube=createdataset(T, oc.allAxes; chunksize=cs, chunkoffset=co, kwargs...) -end -function generateOutCube(::Type{T},eltype,oc::OutputCube,loopcachesize,co;kwargs...) where T<:Array - newsize=map(length,oc.allAxes) - outar=Array{eltype}(undef,newsize...) - map!(_->_zero(eltype),outar,1:length(outar)) - oc.cube = ESDLArray(oc.allAxes,outar) -end -_zero(T) = zero(T) -_zero(T::Type{<:AbstractString}) = convert(T,"") - - -function generateOutCubes(dc::DATConfig) - co = getchunkoffsets(dc) - foreach(dc.outcubes) do c - co2 = (zeros(Int,length(c.axesSmall))...,co...) - generateOutCube(c,Ref(dc.ispar),dc.max_cache,dc.loopcachesize,co2) - end -end -function generateOutCube(oc::OutputCube,ispar::Ref{Bool},max_cache,loopcachesize,co) - eltype,cubetype = getbackend(oc,ispar,max_cache) - generateOutCube(cubetype,eltype,oc,loopcachesize,co;oc.desc.backendargs...) -end - -function getCubeCache(dc::DATConfig) - outcaches = map(i->allocatecachebuf(i,dc.loopcachesize),dc.outcubes) - incaches = map(i->allocatecachebuf(i,dc.loopcachesize),dc.incubes) - incaches, outcaches -end - -function allocatecachebuf(ic::Union{InputCube,OutputCube},loopcachesize) where N - s = size(ic.cube) - indsall = getindsall(geticolon(ic), ic.loopinds, i->loopcachesize[i], i->s[i]) - zeros(eltype(ic.cube),indsall...) -end - -function init_DATworkers() - freshworkermodule() -end - -function analyzeAxes(dc::DATConfig{NIN,NOUT}) where {NIN,NOUT} - - for cube in dc.incubes - for a in caxes(cube.cube) - in(a,cube.axesSmall) || in(a,dc.LoopAxes) || push!(dc.LoopAxes,a) - end - end - length(dc.LoopAxes)==length(unique(map(typeof,dc.LoopAxes))) || error("Make sure that cube axes of different cubes match") - for cube=dc.incubes - myAxes = caxes(cube.cube) - for (il,loopax) in enumerate(dc.LoopAxes) - in(typeof(loopax),map(typeof,myAxes)) && push!(cube.loopinds,il) - end - end - #Add output broadcast axes - for outcube=dc.outcubes - LoopAxesAdd=CubeAxis[] - for (il,loopax) in enumerate(dc.LoopAxes) - push!(outcube.loopinds,il) - push!(LoopAxesAdd,loopax) - end - outcube.allAxes=CubeAxis[outcube.axesSmall;LoopAxesAdd] - dold = outcube.innerchunks - newchunks = Union{Int,Nothing}[nothing for _ in 1:length(outcube.allAxes)] - for k in keys(dold) - ii = findAxis(k,outcube.allAxes) - if ii !== nothing - newchunks[ii] = dold[k] - end - end - outcube.innerchunks = newchunks - end - #And resolve names in chunk size dicts - return dc -end - -mysizeof(x)=sizeof(x) -mysizeof(x::Type{String})=1 - -""" -Function that compares two cache miss specifiers by their importance -""" -function cmpcachmisses(x1,x2) - #First give preference to compressed misses - if xor(x1.iscompressed,x2.iscompressed) - return x1.iscompressed - #Now compare the size of the miss multiplied with the inner size - else - return x1.cs * x1.innerleap > x2.cs * x2.innerleap - end -end - -function getCacheSizes(dc::DATConfig,loopchunksizes) - - inAxlengths = Vector{Int}[Int.(length.(cube.axesSmall)) for cube in dc.incubes] - inblocksizes = map((x,T)->isempty(x) ? mysizeof(eltype(T.cube)) : prod(x)*mysizeof(eltype(T.cube)),inAxlengths,dc.incubes) - inblocksize,imax = findmax(inblocksizes) - outblocksizes = map(C->length(C.axesSmall)>0 ? sizeof(C.outtype)*prod(map(Int∘length,C.axesSmall)) : 1,dc.outcubes) - outblocksize = length(outblocksizes) > 0 ? findmax(outblocksizes)[1] : 1 - #Now add cache miss information for each input cube to every loop axis - cmisses= NamedTuple{(:iloopax,:cs, :iscompressed, :innerleap,:preventpar),Tuple{Int64,Int64,Bool,Int64,Bool}}[] - userchunks = Dict{Int,Int}() - for (k,v) in loopchunksizes - ii = findAxis(k,dc.LoopAxes) - if ii !== nothing - userchunks[ii] = v - end - end - foreach(dc.LoopAxes,1:length(dc.LoopAxes)) do lax,ilax - haskey(userchunks,ilax) && return nothing - for ic in dc.incubes - ii = findAxis(lax,ic.cube) - if !isa(ii,Nothing) - inax = isempty(ic.axesSmall) ? 1 : prod(map(length,ic.axesSmall)) - push!(cmisses,(iloopax = ilax,cs = cubechunks(ic.cube)[ii],iscompressed = iscompressed(ic.cube), innerleap=inax,preventpar=false)) - end - end - for oc in dc.outcubes - cs = oc.innerchunks - ii = findAxis(lax,oc.allAxes) - if !isa(ii,Nothing) && !isa(cs[ii],Nothing) - innerleap = isempty(oc.axesSmall) ? 1 : prod(map(length,oc.axesSmall)) - push!(cmisses,(iloopax = ilax,cs = cs[ii],iscompressed = iscompressed(oc.cube), innerleap=innerleap, preventpar=true)) - end - end - end - sort!(cmisses,lt=cmpcachmisses) - loopcachesize,nopar = getLoopCacheSize(max(inblocksize,outblocksize),map(length,dc.LoopAxes),dc.max_cache, cmisses, userchunks) - for cube in dc.incubes - cube.cachesize = map(length,cube.axesSmall) - for (cs,loopAx) in zip(loopcachesize,dc.LoopAxes) - in(typeof(loopAx),map(typeof,caxes(cube.cube))) && push!(cube.cachesize,cs) - end - end - nopar && (dc.ispar = false) - dc.loopcachesize=loopcachesize - return dc -end - -"Calculate optimal Cache size to DAT operation" -function getLoopCacheSize(preblocksize,loopaxlengths,max_cache,cmisses,userchunks) - totcachesize=max_cache - loopcachesize = ones(Int,length(loopaxlengths)) - #Go through user definitions - incfac=totcachesize/preblocksize/prod(loopcachesize) - incfac<1 && error("The requested slices do not fit into the specified cache. Please consider increasing max_cache") - - # Go through list of cache misses first and decide - imiss = 1 - while imiss<=length(cmisses) - il = cmisses[imiss].iloopax - s = min(cmisses[imiss].cs,loopaxlengths[il])/loopcachesize[il] - #Check if cache size is already set for this axis - if loopcachesize[il]==1 - if s1 && rem(s,ii)!=0 - ii=ii-1 - end - loopcachesize[il]=ii - break - end - end - imiss+=1 - end - #Now second run to read multiple blocks at once - if incfac >= 2 - for i=1:length(loopcachesize) - haskey(userchunks,i) && continue - imul = min(floor(Int,incfac),loopaxlengths[i]÷loopcachesize[i]) - loopcachesize[i] = loopcachesize[i] * imul - incfac = incfac/imul - incfac < 2 && break - end - end - if imissi.preventpar,cmisses[imiss:end]) - return loopcachesize, nopar -end - -function distributeLoopRanges(block_size::NTuple{N,Int},loopR::NTuple{N,Int},co) where N - allranges = map(block_size,loopR,co) do bs,lr,cocur - collect(filter(!isempty,[max(1,i):min(i+bs-1,lr) for i in (1-cocur):bs:lr])) - end - Iterators.product(allranges...) -end - -function generateworkarrays(dc::DATConfig) - inwork = map(i->getworkarray(i,dc.ntr[myid()]),dc.incubes) - outwork = map(i->getworkarray(i,dc.ntr[myid()]),dc.outcubes) - inwork, outwork -end - -macro innercode() - esc(quote - ithr = Threads.threadid() - #Pick the correct array according to thread - myinwork = map(i->i[ithr],inwork) - myoutwork = map(i->i[ithr],outwork) - #Copy data into work arrays - foreach(myinwork,xinBC) do iw,x - iw.=view(x,cI.I...) - end - #Apply filters - mvs = map(docheck,filters,myinwork) - if any(mvs) - # Set all outputs to missing - foreach(ow->fill!(ow,missing),myoutwork) - else - #Compute loop axis values if necessary - laxval = getlaxvals(axvalcreator,cI,offscur) - #Finally call the function - f(myoutwork...,myinwork...,laxval...,addargs...;kwargs...) - end - #Copy data into output array - foreach((iw,x)->view(x,cI.I...).=iw,myoutwork,xoutBC) - end) -end - -using DataStructures: OrderedDict -using Base.Cartesian -@noinline function innerLoop(loopRanges,f,xinBC,xoutBC,filters, - inwork,outwork,axvalcreator,addargs,kwargs) - offscur = map(i->(first(i)-1), loopRanges) - if length(inwork[1])==1 - for cI in CartesianIndices(map(i->1:length(i),loopRanges)) - @innercode - end - else - Threads.@threads for cI in CartesianIndices(map(i->1:length(i),loopRanges)) - @innercode - end - end -end - - -"Calculate an axis permutation that brings the wanted dimensions to the front" -function getFrontPerm(dc,dims) - ax=caxes(dc) - N=length(ax) - perm=Int[i for i=1:length(ax)]; - iold=Int[] - for i=1:length(dims) push!(iold,findAxis(dims[i],ax)) end - iold2=sort(iold,rev=true) - for i=1:length(iold) splice!(perm,iold2[i]) end - perm=Int[iold;perm] - return ntuple(i->perm[i],N) -end - -include("dciterators.jl") -include("tablestats.jl") -end diff --git a/src/DAT/SentinelMissings.jl b/src/DAT/SentinelMissings.jl deleted file mode 100644 index 5015b67a..00000000 --- a/src/DAT/SentinelMissings.jl +++ /dev/null @@ -1,45 +0,0 @@ -module SentinelMissings -struct SentinelMissing{T,SV} <: Number - x::T -end -Base.promote_rule(::Type{SentinelMissing{SM,SV}},::Union{T,Missing}) where {SM, SV, T}=Union{promote_type(SM,T),Missing} -Base.promote_rule(::Type{SentinelMissing{SM,SV}},::Type{T}) where {SM,SV,T} = Union{promote_type(SM,T),Missing} -Base.promote_rule(::Type{SentinelMissing{SM,SV}},::Type{SentinelMissing{SM,SV}}) where {SM,SV} = Union{SM,Missing} -Base.convert(::Type{Union{T,Missing}},sm::SentinelMissing) where {T} = ismissing(sm) ? missing : convert(T,sm[]) -Base.convert(::Type{T},sm::SentinelMissing) where T<:Number = convert(T,sm[]) -Base.getindex(x::SentinelMissing)=ismissing(x) ? missing : x.x -Base.convert(::Type{Any}, sm::SentinelMissing) = sm[] -Base.ismissing(x::SentinelMissing) = isequal(x.x,senval(x)) -Base.nonmissingtype(::Type{SentinelMissing{T,SV}}) where {T,SV} = T -Base.isless(x::SentinelMissing,y::SentinelMissing) = isless(x[],y[]) -Base.isless(x::SentinelMissing,y::Number) = isless(x[],y) -Base.isless(y::Number, x::SentinelMissing) = isless(y,x[]) -function Base.convert(::Type{<:T}, x::Number) where T<:SentinelMissing - sv = senval(T) - et = eltype(T) - SentinelMissing{et,sv}(convert(et,x)) -end -Base.convert(::Type{<:T}, x::Missing) where T<:SentinelMissing = SentinelMissing{eltype(T),senval(T)}(senval(T)) -for op in (:(+),:(-),:(*),:(/),:(^)) - eval(:(Base.$(op)(x1::T,x2::T) where T<:SentinelMissing{SM} where SM = $(op)(convert(Union{SM,Missing},x1),convert(Union{SM,Missing},x2)))) -end -Base.zero(T::Type{<:SentinelMissing{SM}}) where SM = T(zero(SM)) -Base.one(T::Type{<:SentinelMissing{SM}}) where SM = T(one(SM)) -Base.convert(::Type{<:T}, x::T) where T<:SentinelMissing = x -senval(::SentinelMissing{<:Any,SV}) where SV = SV -senval(::Type{<:SentinelMissing{<:Any,SV}}) where SV = SV -Base.eltype(::Type{<:SentinelMissing{T}}) where T = T -Base.show(io::IO,x::SentinelMissing)=print(io,x[]) -Base.similar(x::AbstractArray{<:SentinelMissing{T,SV}}) where {T,SV} = zeros(Union{T,Missing},size(x)) -Base.similar(x::AbstractArray{<:SentinelMissing{T,SV}},dims::Tuple{Vararg{Int64,N}}) where {T,SV,N} = zeros(Union{T,Missing},dims) - -""" - as_sentinel(x, v) - -Reinterprets a Number Array or a Number `x` so that values in x that equal v will be treated as missing. -This is done by reinterpreting the array as a `SentinelMissing` without copying the data. -""" -as_sentinel(x::AbstractArray{T},v) where T<:Number = reinterpret(SentinelMissing{T,convert(T,v)},x) -as_sentinel(x::T,v) where T<:Number = SentinelMissing{T,convert(T,v)}(x) -export as_sentinel -end diff --git a/src/DAT/dciterators.jl b/src/DAT/dciterators.jl deleted file mode 100644 index a62d9324..00000000 --- a/src/DAT/dciterators.jl +++ /dev/null @@ -1,218 +0,0 @@ -include("SentinelMissings.jl") -import .SentinelMissings -import ESDL.DAT: DATConfig -import ESDL.ESDLTools: PickAxisArray - -struct CubeIterator{R,ART,ARTBC,LAX,ILAX,S} - dc::DATConfig - r::R - inars::ART - inarsBC::ARTBC - loopaxes::LAX -end -Base.IteratorSize(::Type{<:CubeIterator})=Base.HasLength() -Base.IteratorEltype(::Type{<:CubeIterator})=Base.HasEltype() -Base.eltype(i::Type{<:CubeIterator{A,B,C,D,E,F}}) where {A,B,C,D,E,F} = F - -tuplelen(::Type{<:NTuple{N,<:Any}}) where N=N - -lift64(::Type{Float32})=Float64 -lift64(::Type{Int32})=Int64 -lift64(T)=T - -defaultval(t::Type{<:AbstractFloat})=convert(t,NaN) -defaultval(t::Type{<:Signed})=typemin(t)+1 -defaultval(t::Type{<:Unsigned})=typemax(t)-1 - -function CubeIterator(dc,r;varnames::Tuple=ntuple(i->Symbol("x$i"),length(dc.incubes)),include_loopvars=()) - loopaxes = ntuple(i->dc.LoopAxes[i],length(dc.LoopAxes)) - inars, _ = getCubeCache(dc) - length(varnames) == length(dc.incubes) || error("Supplied $(length(varnames)) varnames and $(length(dc.incubes)) cubes.") - inarsbc = map(dc.incubes, inars) do ic, cache - allax = getindsall(geticolon(ic), ic.loopinds, i->true) - if ic.colonperm === nothing - pa = PickAxisArray(cache,allax) - else - pa = PickAxisArray(cache,allax,ic.colonperm) - end - end - et = map(inarsbc) do ibc - ti = eltype(ibc) - if ti <: AbstractArray - ti - else - ti = Base.nonmissingtype(ti) - SentinelMissings.SentinelMissing{ti,defaultval(ti)} - end - end - if include_loopvars == true - include_loopvars = map(axname,(loopaxes...,)) - end - if isa(include_loopvars,Tuple) && !isempty(include_loopvars) - ilax = map(i->findAxis(i,collect(loopaxes)),include_loopvars) - any(isequal(nothing),ilax) && error("Axis not found in cubes") - et=(et...,map(i->eltype(loopaxes[i]),ilax)...) - else - ilax=() - end - - elt = NamedTuple{(map(Symbol,varnames)...,map(Symbol,include_loopvars)...),Tuple{et...}} - CubeIterator{typeof(r),typeof(inars),typeof(inarsbc),typeof(loopaxes),ilax,elt}(dc,r,inars,inarsbc,loopaxes) -end -iternames(::CubeIterator{<:Any,<:Any,<:Any,<:Any,<:Any,E}) where E = E -tuplenames(t::Type{<:NamedTuple{N}}) where N = string.(N) -function Base.show(io::IO,ci::CubeIterator{<:Any,<:Any,<:Any,<:Any,<:Any,E}) where E - print(io,"Datacube iterator with ", length(ci), " elements with fields: ",tuplenames(E)) -end -Base.length(ci::CubeIterator)=prod(length.(ci.loopaxes)) -function Base.iterate(ci::CubeIterator) - rnow,blockstate = iterate(ci.r) - updatears(ci.dc.incubes,rnow,:read,ci.inars) - innerinds = CartesianIndices(length.(rnow)) - indnow, innerstate = iterate(innerinds) - offs = map(i->first(i)-1,rnow) - getrow(ci,ci.inarsBC,indnow,offs), - (rnow=rnow, - blockstate=blockstate, - innerinds = innerinds, - innerstate=innerstate - ) -end -function Base.iterate(ci::CubeIterator,s) - t1 = iterate(s.innerinds,s.innerstate) - N = tuplelen(eltype(ci.r)) - if t1 == nothing - t2 = iterate(ci.r,s.blockstate) - if t2 == nothing - return nothing - else - rnow = t2[1] - blockstate = t2[2] - updatears(ci.dc.incubes,rnow,:read,ci.inars) - innerinds = CartesianIndices(length.(rnow)) - indnow,innerstate = iterate(innerinds) - - end - else - rnow, blockstate = s.rnow, s.blockstate - innerinds = s.innerinds - indnow, innerstate = iterate(innerinds,s.innerstate) - end - offs = map(i->first(i)-1,rnow) - getrow(ci,ci.inarsBC,indnow,offs),(rnow=rnow::NTuple{N,UnitRange{Int64}}, - blockstate=blockstate::Int64, - innerinds=innerinds::CartesianIndices{N,NTuple{N,Base.OneTo{Int64}}}, - innerstate=innerstate::CartesianIndex{N}) -end -abstract type CubeRow -end -abstract type CubeRowAx<:CubeRow -end - -function getrow(ci::CubeIterator{<:Any,<:Any,<:Any,<:Any,ILAX,S},inarsBC,indnow,offs) where {ILAX,S} - axvalsall = map((ax,i,o)->ax.values[i+o],ci.loopaxes,indnow.I,offs) - axvals = map(i->axvalsall[i],ILAX) - cvals = map(i->i[indnow],inarsBC) - S((cvals...,axvals...)) -end -function getrow(ci::CubeIterator{<:Any,<:Any,<:Any,<:Any,(),S},inarsBC,indnow,offs) where S - cvals = map(i->i[indnow],inarsBC) - S(cvals) -end - -function Base.show(io::IO,s::CubeRow) - print(io,"Cube Row: ") - for n in propertynames(s) - print(io,string(n), "=",getproperty(s,n)," ") - end -end -function Base.show(io::IO,s::CubeRowAx) - print(io,"Cube Row: ") - for n in propertynames(s) - print(io,string(n), "=",getproperty(s,n)," ") - end -end -function Base.show(io::IO,s::Type{<:CubeRow}) - foreach(fieldnames(s)) do fn - print(io,fn,"::",fieldtype(s,fn),", ") - end -end -function Base.iterate(s::CubeRow,state=1) - allnames = propertynames(s) - if state<=length(allnames) - (getproperty(s,allnames[state]),state+1) - else - nothing - end -end - - -import DataStructures: OrderedDict -export CubeTable -""" - CubeTable(c::AbstractCubeData...) - -Function to turn a DataCube object into an iterable table. Takes a list of as arguments, -specified as a `name=cube` expression. For example -`CubeTable(data=cube1,country=cube2)` would generate a Table with the entries `data` and `country`, -where `data` contains the values of `cube1` and `country` the values of `cube2`. The cubes -are matched and broadcasted along their axes like in `mapCube`. - -In addition, one can specify -`axes=(ax1,ax2...)` when one wants to include the values of certain axes in the table. For example -the command `(CubeTable(tair=cube1 axes=("lon","lat","time"))` would produce an iterator over a data structure -with entries `tair`, `lon`, `lat` and `time`. - -Lastly there is an option to specify which axis shall be the fastest changing when iterating over the cube. -For example `CubeTable(tair=cube1,fastest="time"` will ensure that the iterator will always loop over consecutive -time steps of the same location. -""" -function CubeTable(;include_axes=(),expandaxes=(),cubes...) - c = (map((k,v)->v,keys(cubes),values(cubes))...,) - all(i->isa(i,AbstractCubeData),c) || throw(ArgumentError("All inputs must be DataCubes")) - varnames = map(string,keys(cubes)) - expandaxes = isa(expandaxes,Tuple) ? expandaxes : (expandaxes,) - inaxnames = Set{String}() - indims = if isempty(expandaxes) - map(i->InDims(),c) - else - map(c) do i - axn = filter(collect(expandaxes)) do ax - findAxis(ax,i) !== nothing - end - foreach(j->push!(inaxnames,axname(getAxis(j,i))), axn) - InDims(axn...) - end - end - axnames = map(i->axname.(caxes(i)),c) - allvars = union(axnames...) - allnums = collect(1:length(allvars)) - - configiter = mapCube(identity,c,debug=true,indims=indims,outdims=(),ispar=false); - # if inax !== nothing - # linax = length(inax) - # foreach(configiter.incubes) do ic1 - # if !isempty(ic1.axesSmall) - # empty!(ic1.axesSmall) - # map!(i->i+1,ic1.loopinds,ic1.loopinds) - # pushfirst!(ic1.loopinds,1) - # else - # map!(i->i+1,ic1.loopinds,ic1.loopinds) - # end - # end - # end - r = collect(distributeLoopRanges((configiter.loopcachesize...,),(map(length,configiter.LoopAxes)...,),getchunkoffsets(configiter))) - ci = CubeIterator(configiter,r,include_loopvars=include_axes,varnames=varnames) -end - -import Tables -Tables.istable(::Type{<:CubeIterator}) = true -Tables.rowaccess(::Type{<:CubeIterator}) = true -Tables.rows(x::CubeIterator) = x -Tables.schema(x::CubeIterator) = Tables.schema(typeof(x)) -Tables.schema(x::Type{<:CubeIterator}) = Tables.Schema(fieldnames(eltype(x)),map(s->fieldtype(eltype(x),s),fieldnames(eltype(x)))) - -Tables.istable(::Type{<:AbstractCubeData}) = true -Tables.rowaccess(::Type{<:AbstractCubeData}) = true -Tables.rows(x::AbstractCubeData) = CubeTable(value = x, include_axes=true) -Tables.schema(x::AbstractCubeData) = Tables.schema(Tables.rows(x)) diff --git a/src/DAT/registration.jl b/src/DAT/registration.jl deleted file mode 100644 index 0b7737ec..00000000 --- a/src/DAT/registration.jl +++ /dev/null @@ -1,109 +0,0 @@ -export InDims, OutDims -const AxisDescriptorAll = Union{AxisDescriptor,String,Type{T},CubeAxis,Function} where T<:CubeAxis -using ..Cubes.Axes: get_descriptor, ByFunction -using ...ESDL: workdir, ESDLDefaults -using DataFrames: DataFrame -using YAXArrayBase: yaxcreate - -wrapWorkArray(::Type{Array},a,axes) = a -wrapWorkArray(T,a,axes) = yaxcreate(T,a,map(axname,axes),map(i->i.values,axes),nothing) -# import DataFrames -# function wrapWorkArray(t::AsDataFrame,a,cablabaxes) -# colnames = map(Symbol,cablabaxes[2].values) -# df = DataFrames.DataFrame(a,colnames) -# if t.dimcol -# df[Symbol(axname(cablabaxes[1]))]=collect(cablabaxes[1].values) -# end -# df -# end - -abstract type ProcFilter end -struct AllMissing <: ProcFilter end -struct NValid <: ProcFilter - n::Int -end -struct AnyMissing <: ProcFilter end -struct AnyOcean <: ProcFilter end -struct NoFilter <: ProcFilter end -struct StdZero <: ProcFilter end -struct UserFilter{F} <: ProcFilter - f::F -end - -checkskip(::NoFilter,x) = false -checkskip(::AllMissing,x::AbstractArray) = all(ismissing,x) -checkskip(::AllMissing,df::DataFrame) = any(map(i->all(ismissing,getindex(df,i)),names(df))) -checkskip(::AnyMissing,x::AbstractArray) = any(ismissing,x) -checkskip(::AnyMissing,df::DataFrame) = any(map(i->any(ismissing,getindex(df,i)),names(df))) -checkskip(nv::NValid,x::AbstractArray) = count(!ismissing,x) <= nv.n -checkskip(uf::UserFilter,x) = uf.f(x) -checkskip(::StdZero,x) = all(i->i==x[1],x) -docheck(pf::ProcFilter,x)::Bool = checkskip(pf,x) -docheck(pf::Tuple,x) = reduce(|,map(i->docheck(i,x),pf)) - -getprocfilter(f::Function) = (UserFilter(f),) -getprocfilter(pf::ProcFilter) = (pf,) -getprocfilter(pf::NTuple{N,<:ProcFilter}) where N = pf - -""" - InDims(axisdesc;...) - -Creates a description of an Input Data Cube for cube operations. Takes a single - or a Vector/Tuple of axes as first argument. Axes can be specified by their - name (String), through an Axis type, or by passing a concrete axis. - -### Keyword arguments - -* `artype` how shall the array be represented in the inner function. Defaults to `AsArray`, alternatives are `AsDataFrame` or `AsAxisArray` -* `filter` define some filter to skip the computation, e.g. when all values are missing. Defaults to - `AllMissing()`, possible values are `AnyMissing()`, `AnyOcean()`, `StdZero()`, `NValid(n)` - (for at least n non-missing elements). It is also possible to provide a custom one-argument function - that takes the array and returns `true` if the compuation shall be skipped and `false` otherwise. -""" -mutable struct InDims - axisdesc::Tuple - artype - procfilter::Tuple -end -function InDims(axisdesc::AxisDescriptorAll...; artype=Array, filter = AllMissing()) - descs = map(get_descriptor,axisdesc) - any(i->isa(i,ByFunction),descs) && error("Input cubes can not be specified through a function") - InDims(descs,artype,getprocfilter(filter)) -end - - -struct OutDims - axisdesc - backend::Symbol - backendargs - update::Bool - artype - chunksize::Any - outtype::Union{Int,DataType} -end -""" - OutDims(axisdesc;...) - -Creates a description of an Output Data Cube for cube operations. Takes a single - or a Vector/Tuple of axes as first argument. Axes can be specified by their - name (String), through an Axis type, or by passing a concrete axis. - -- `axisdesc`: List of input axis names -- `backend` : specifies the dataset backend to write data to, must be either :auto or a key in `YAXArrayBase.backendlist` -- `update` : specifies wether the function operates inplace or if an output is returned -- `artype` : specifies the Array type inside the inner function that is mapped over -- `chunksize`: A Dict specifying the chunksizes for the output dimensions of the cube, or `:input` to copy chunksizes from input cube axes or `:max` to not chunk the inner dimensions -- `outtype`: force the output type to a specific type, defaults to `Any` which means that the element type of the first input cube is used -""" -function OutDims(axisdesc...; - backend=:auto, - update=false, - artype=Array, - chunksize=ESDLDefaults.chunksize[], - outtype=1, - backendargs...) - descs = get_descriptor.(axisdesc) - OutDims(descs,backend,backendargs,update,artype,chunksize,outtype) -end - -registerDATFunction(a...;kwargs...)=@warn("Registration does not exist anymore, ignoring....") diff --git a/src/DAT/tablestats.jl b/src/DAT/tablestats.jl deleted file mode 100644 index 9acc1d3f..00000000 --- a/src/DAT/tablestats.jl +++ /dev/null @@ -1,304 +0,0 @@ -import OnlineStats: OnlineStat, Extrema, fit!, value -import ...Cubes.Axes: CategoricalAxis, RangeAxis -import IterTools -using WeightedOnlineStats -import ProgressMeter: next!, Progress - -import WeightedOnlineStats: WeightedOnlineStat -abstract type TableAggregator end -struct OnlineAggregator{O,S}<:TableAggregator - o::O -end -function OnlineAggregator(O::OnlineStat,s::Symbol) where N - OnlineAggregator{typeof(O),s}(copy(O)) -end -cubeeltype(t::OnlineAggregator)=Float64 -function fitrow!(o::OnlineAggregator{T,S},r) where {T<:OnlineStat,S} - v = getproperty(r,S) - !ismissing(v) && fit!(o.o,v) -end -value(o::OnlineAggregator)=value(o.o) -struct WeightOnlineAggregator{O,S,W}<:TableAggregator - o::O - w::W -end -function WeightOnlineAggregator(O::WeightedOnlineStat,s::Symbol,w) where N - WeightOnlineAggregator{typeof(O),s,typeof(w)}(copy(O),w) -end -cubeeltype(t::WeightOnlineAggregator{T}) where T = cubeeltype(T) -value(o::WeightOnlineAggregator)=value(o.o) -function fitrow!(o::WeightOnlineAggregator{T,S},r) where {T<:OnlineStat,S} - v = getproperty(r,S) - w = o.w(r) - if !checkmiss(v) && !ismissing(w) - fit!(o.o,v,w) - end -end -checkmiss(v) = ismissing(v) -checkmiss(v::AbstractVector) = any(ismissing,v) -struct GroupedOnlineAggregator{O,S,BY,W,C}<:TableAggregator - d::O - w::W - by::BY - cloneobj::C -end -value(o::GroupedOnlineAggregator)=Dict(zip(keys(o.d),map(value,(values(o.d))))) -struct SymType{S} -end -SymType(s::Symbol)=SymType{s}() -(f::SymType{S})(x) where S=getproperty(x,S) -getbytypes(et,by) = Tuple{map(i->Base.nonmissingtype(Base.return_types(i,Tuple{et})[1]),by)...} -cubeeltype(t::GroupedOnlineAggregator{T}) where T=cubeeltype(T) -cubeeltype(t::Type{<:Dict{<:Any,T}}) where T = cubeeltype(T) -cubeeltype(t::Type{<:WeightedOnlineStat{T}}) where T = T -cubeeltype(t::Type{<:OnlineStat{Number}}) = Float64 -cubeeltype(t::Type{<:WeightedCovMatrix{T}}) where T = T -cubeeltype(t::Type{<:Extrema{T}}) where T = T - - -function GroupedOnlineAggregator(O::OnlineStat,s::Symbol,by,w,iter) where N - ost = typeof(O) - et = eltype(iter) - bytypes = Tuple{map(i->Base.nonmissingtype(Base.return_types(i,Tuple{et})[1]),by)...} - d = Dict{bytypes,ost}() - GroupedOnlineAggregator{typeof(d),s,typeof(by),typeof(w),ost}(d,w,by,O) -end - -dicteltype(::Type{<:Dict{K,V}}) where {K,V} = V -dictktype(::Type{<:Dict{K,V}}) where {K,V} = K -actval(v) = v -actval(v::SentinelMissings.SentinelMissing) = v[] -function fitrow!(o::GroupedOnlineAggregator{T,S,BY,W},r) where {T,S,BY,W} - v = getproperty(r,S) - if !ismissing(v) - w = o.w(r) - if w==nothing - bykey = map(i->actval(i(r)),o.by) - if !any(ismissing,bykey) - if haskey(o.d,bykey) - fit!(o.d[bykey],v) - else - o.d[bykey] = copy(o.cloneobj) - fit!(o.d[bykey],v) - end - end - else - if !ismissing(w) - bykey = map(i->actval(i(r)),o.by) - if !any(ismissing,bykey) - if haskey(o.d,bykey) - fit!(o.d[bykey],v,w) - else - o.d[bykey] = copy(o.cloneobj) - fit!(o.d[bykey],v,w) - end - end - end - end - end -end -export TableAggregator, fittable, cubefittable -function TableAggregator(iter,O,fitsym;by=(),weight=nothing) - !isa(by,Tuple) && (by=(by,)) - if !isempty(by) - weight==nothing && (weight=(i->nothing)) - by = map(i->isa(i,Symbol) ? (SymType(i)) : i,by) - GroupedOnlineAggregator(O,fitsym,by,weight,iter) - else - if weight==nothing - OnlineAggregator(O,fitsym) - else - WeightOnlineAggregator(O,fitsym,weight) - end - end -end - -function tooutaxis(::SymType{s},iter::CubeIterator{<:Any,<:Any,<:Any,<:Any,<:Any,S},k,ibc) where {s,S} - ichosen = findfirst(i->i===s,fieldnames(S)) - if ichosen<=length(iter.inars) - bycube = iter.dc.incubes[ichosen].cube - if haskey(bycube.properties,"labels") - idict=bycube.properties["labels"] - axname=get(bycube.properties,"name","Label") - outAxis=CategoricalAxis(axname,collect(String,values(idict))) - convertdict=Dict(k=>i for (i,k) in enumerate(keys(idict))) - else - sort!(k) - outAxis=CategoricalAxis("Label$(ibc)",k) - convertdict=Dict(k=>i for (i,k) in enumerate(k)) - end - else - iax = findAxis(string(s),iter.dc.LoopAxes) - outAxis=iter.dc.LoopAxes[iax] - convertdict=Dict(k=>i for (i,k) in enumerate(outAxis.values)) - end - outAxis,convertdict -end -function tooutaxis(f,iter::CubeIterator{<:Any,<:Any,<:Any,<:Any,<:Any,S},k,ibc) where S - sort!(k) - outAxis=CategoricalAxis("Category$(ibc)",k) - convertdict=Dict(k=>i for (i,k) in enumerate(k)) - outAxis,convertdict -end - -varsym(::WeightOnlineAggregator{<:Any,S}) where S = S -varsym(::OnlineAggregator{<:Any,S}) where S = S -varsym(::GroupedOnlineAggregator{<:Any,S}) where S = S -axt(::CategoricalAxis)=CategoricalAxis -axt(::RangeAxis)=RangeAxis -getStatType(::WeightOnlineAggregator{T}) where T = T -getStatType(::OnlineAggregator{T}) where T = T -getStatType(t::GroupedOnlineAggregator{T}) where T=getStatType(T) -getStatType(t::Type{<:Dict{<:Any,T}}) where T = T - -getStatOutAxes(tab,agg)=getStatOutAxes(tab,agg,getStatType(agg)) -getStatOutAxes(tab,agg,::Type{<:OnlineStat})=() -function getStatOutAxes(tab, agg, ::Type{<:Extrema}) - ( CategoricalAxis(:Extrema, ["min", "max"]), ) -end -function getStatOutAxes(tab,agg,::Type{<:WeightedCovMatrix}) - varn = fieldnames(eltype(tab)) - s = varsym(agg) - icube = findfirst(isequal(s),varn) - ax = tab.dc.incubes[icube].axesSmall[1] - oldname = ESDL.Cubes.Axes.axname(ax) - coname = string("Co",oldname) - v = ax.values - axtype = axt(ax) - a1 = axtype(oldname,copy(v)) - a2 = axtype(coname,copy(v)) - (a1,a2) -end -function getStatOutAxes(tab,agg,::Type{<:WeightedAdaptiveHist}) - nbin = getnbins(agg) - a1 = RangeAxis("Bin",1:nbin) - a2 = CategoricalAxis("Hist",["MidPoints","Frequency"]) - (a1,a2) -end -function getByAxes(iter,agg::GroupedOnlineAggregator) - by = agg.by - ntuple(length(by)) do ibc - bc=agg.by[ibc] - tooutaxis(bc,iter,unique(map(i->i[ibc],collect(keys(agg.d)))),ibc) - end -end -getByAxes(iter,agg)=() - -function tooutcube( - agg, - iter, - post - ) - outaxby = getByAxes(iter,agg) - axby = map(i->i[1],outaxby) - convdictall = map(i->i[2],outaxby) - - outaxstat = getStatOutAxes(iter,agg) - outax = (outaxstat...,axby...) - snew = map(length,outax) - aout = fill!(zeros(Union{cubeeltype(agg),Missing},snew),missing) - filloutar(aout,convdictall,agg,map(i->1:length(i),outaxstat),post) - - ESDLArray(collect(outax),aout) -end -function filloutar(aout,convdictall,agg::GroupedOnlineAggregator,s,post) - for (k,v) in agg.d - i = CartesianIndices((s...,map((i,d)->d[convert(keytype(d),i)]:d[convert(keytype(d),i)],k,convdictall)...)) - aout[i.indices...].=post(v) - end -end -function filloutar(aout,convdictall,agg,g,post) - copyto!(aout,post(agg.o)) -end -""" - fittable(tab,o,fitsym;by=(),weight=nothing) - -Loops through an iterable table `tab` and thereby fitting an OnlineStat `o` with the values -specified through `fitsym`. Optionally one can specify a field (or tuple) to group by. -Any groupby specifier can either be a symbol denoting the entry to group by or an anynymous -function calculating the group from a table row. - -For example the following would caluclate a weighted mean over a cube weighted by grid cell -area and grouped by country and month: - -````julia -fittable(iter,WeightedMean,:tair,weight=(i->abs(cosd(i.lat))),by=(i->month(i.time),:country)) -```` -""" -function fittable(tab,o,fitsym;by=(),weight=nothing,showprog=false) - agg = TableAggregator(tab,o,fitsym,by=by,weight=weight) - if showprog - runfitrows_progress(agg,tab) - else - foreach(i->fitrow!(agg,i),tab) - end - agg -end -fittable(tab,o::Type{<:OnlineStat},fitsym;kwargs...)=fittable(tab,o(),fitsym;kwargs...) - -@noinline function runfitrows_progress(agg,tab) - p = Progress(length(tab)÷100,1) - every = 0 - for row in tab - fitrow!(agg,row) - every += 1 - if every == 100 - next!(p) - every=0 - end - end -end - -struct collectedValue{V,S,SY} - value::V - laststruct::S -end - -function Base.getproperty(s::collectedValue{<:Any,<:Any,SY},sy::Symbol) where SY - if sy == SY - getfield(s,:value) - else - getproperty(getfield(s,:laststruct),sy) - end -end - -function collectval(row::Union{Tuple, Vector},::Val{SY}) where SY - nvars = length(row) - v = ntuple(i->getfield(row[i],SY),nvars) |> collect - val = collectedValue{typeof(v),typeof(row[end]),SY}(v, row[end]) -end - -getpostfunction(s::OnlineStat)=getpostfunction(typeof(s)) -getpostfunction(::Type{<:OnlineStat})=value -function getpostfunction(w::WeightedAdaptiveHist) - nb = w.alg.b - i->begin - r = hcat(value(i)...) - if size(r,1)push!(axesall,a),ax) - end - axesall = collect(axesall) - axnameall = axsym.(axesall) - axesnew = Dict{Symbol,CubeAxis}(axnameall[i]=>axesall[i] for i in 1:length(axesall)) - Dataset(OrderedDict(cubesnew), axesnew) -end - -function Base.show(io::IO,ds::Dataset) - println(io,"ESDL Dataset") - println(io,"Dimensions: ") - foreach(a->println(io," ", a),values(ds.axes)) - print(io,"Variables: ") - foreach(i->print(io,i," "),keys(ds.cubes)) -end -function Base.propertynames(x::Dataset, private=false) - if private - Symbol[:cubes; :axes; keys(x.cubes); keys(x.axes)] - else - Symbol[collect(keys(x.cubes)); collect(keys(x.axes))] - end -end -function Base.getproperty(x::Dataset,k::Symbol) - if k === :cubes - return getfield(x,:cubes) - elseif k === :axes - return getfield(x,:axes) - else - x[k] - end -end -Base.getindex(x::Dataset,i::Symbol) = haskey(x.cubes, i) ? x.cubes[i] : haskey(x.axes,i) ? x.axes[i] : throw(ArgumentError("$i not found in Dataset")) -function Base.getindex(x::Dataset,i::Vector{Symbol}) - cubesnew = Dict{Symbol, AbstractCubeData}(j=>x.cubes[j] for j=i) - Dataset(;cubesnew...) -end - -function fuzzyfind(s::String,comp::Vector{String}) - sl = lowercase(s) - f = findall(i->startswith(lowercase(i),sl),comp) - if length(f) != 1 - throw(KeyError("Name $s not found")) - else - f[1] - end -end -function Base.getindex(x::Dataset,i::Vector{String}) - istr = string.(keys(x.cubes)) - ids = map(name->fuzzyfind(name,istr),i) - syms = map(j->Symbol(istr[j]),ids) - cubesnew = Dict{Symbol, AbstractCubeData}(Symbol(i[j])=>x.cubes[syms[j]] for j=1:length(ids)) - Dataset(;cubesnew...) -end -Base.getindex(x::Dataset,i::String)=getproperty(x,Symbol(i)) -function subsetcube(x::Dataset; var=nothing, kwargs...) - if var ===nothing - cc = x.cubes - Dataset(;map(ds->ds=>subsetcube(cc[ds];kwargs...),collect(keys(cc)))...) - elseif isa(var,String) || isa(var, Symbol) - subsetcube(getproperty(x,Symbol(var));kwargs...) - else - cc = x[var].cubes - Dataset(;map(ds->ds=>subsetcube(cc[ds];kwargs...),collect(keys(cc)))...) - end -end -function collectdims(g) - dlist = Set{Tuple{String,Int,Int}}() - varnames = get_varnames(g) - foreach(varnames) do k - d = get_var_dims(g,k) - v = get_var_handle(g,k) - for (len,dname) in zip(size(v),d) - if !occursin("bnd",dname) && !occursin("bounds",dname) - datts = get_var_attrs(g,dname) - offs = get(datts,"_ARRAY_OFFSET",0) - push!(dlist,(dname,offs,len)) - end - end - end - outd = Dict(d[1] => (ax = toaxis(d[1],g,d[2],d[3]), offs = d[2]) for d in dlist) - length(outd)==length(dlist) || throw(ArgumentError("All Arrays must have the same offset")) - outd -end - -function toaxis(dimname,g,offs,len) - axname = dimname - if !haskey(g,dimname) - return RangeAxis(dimname, 1:len) - end - ar = g[dimname] - aratts = get_var_attrs(g,dimname) - if uppercase(axname)=="TIME" && haskey(aratts,"units") - tsteps = timedecode(ar[:],aratts["units"],get(aratts,"calendar","standard")) - RangeAxis(dimname,tsteps[offs+1:end]) - elseif haskey(aratts,"_ARRAYVALUES") - vals = identity.(aratts["_ARRAYVALUES"]) - CategoricalAxis(axname,vals) - else - axdata = testrange(ar[offs+1:end]) - if eltype(axdata)<:AbstractString || (!issorted(axdata) && !issorted(axdata,rev=true)) - CategoricalAxis(axname,axdata) - else - RangeAxis(axname,axdata) - end - end -end -propfromattr(attr) = Dict{String,Any}(filter(i->i[1]!=="_ARRAY_DIMENSIONS",attr)) - -"Test if data in x can be approximated by a step range" -function testrange(x) - r = range(first(x),last(x),length=length(x)) - all(i->isapprox(i...),zip(x,r)) ? r : x -end - -testrange(x::AbstractArray{<:AbstractString}) = x - -function resolve_stars(s, res=String[]) - s2 = splitpath(s) - istar = findfirst(i->occursin('*',i),s2) - if istar===nothing - return push!(res,s) - end - p = joinpath(s2[1:istar-1]...) - allfiles = readdir(p) - allfiles = filter(i->match(wildcard_to_regex(s2[istar]), i)!==nothing, allfiles) - foreach(i->resolve_stars(joinpath(p,i,s2[istar+1:end]...),res),allfiles) - return res -end -function wildcard_to_regex(s) - if startswith(s,"*") - s = string("[^.]",s) - end - Regex(replace(s,'*'=>".*")) -end - -open_mfdataset(g::AbstractString; kwargs...) = open_mfdataset(resolve_stars(g); kwargs...) -open_mfdataset(g::Vector{<:AbstractString};kwargs...) = - merge_datasets(map(i->open_dataset(i;kwargs...), g)) - -function open_dataset(g; driver = :all) - g = to_dataset(g, driver=driver) - isempty(get_varnames(g)) && throw(ArgumentError("Zarr Group does not contain datasets.")) - dimlist = collectdims(g) - dnames = string.(keys(dimlist)) - varlist = filter(get_varnames(g)) do vn - upname = uppercase(vn) - !occursin("BNDS",upname) && !occursin("BOUNDS",upname) && !any(i->isequal(upname,uppercase(i)),dnames) - end - allcubes = OrderedDict{Symbol,AbstractCubeData}() - for vname in varlist - vardims = get_var_dims(g,vname) - iax = [dimlist[vd].ax for vd in vardims] - offs = [dimlist[vd].offs for vd in vardims] - subs = if all(iszero,offs) - nothing - else - ntuple(i->(offs[i]+1):(offs[i]+length(iax[i])),length(offs)) - end - ar = get_var_handle(g,vname) - att = get_var_attrs(g,vname) - allcubes[Symbol(vname)] = ESDLArray(iax,ar,propfromattr(att), cleaner=CleanMe[]) - end - sdimlist = Dict(Symbol(k)=>v.ax for (k,v) in dimlist) - Dataset(allcubes,sdimlist) -end -Base.getindex(x::Dataset;kwargs...) = subsetcube(x;kwargs...) -ESDLDataset(;kwargs...) = Dataset(ESDL.ESDLDefaults.cubedir[];kwargs...) - - -function Cube(ds::Dataset; joinname="Variable") - - dl = collect(keys(ds.axes)) - dls = string.(dl) - length(ds.cubes)==1 && return first(values(ds.cubes)) - #TODO This is an ugly workaround to merge cubes with different element types, - # There should bde a more generic solution - eltypes = map(eltype,values(ds.cubes)) - majtype = findmax(counter(eltypes))[2] - newkeys = Symbol[] - for k in keys(ds.cubes) - c = ds.cubes[k] - if all(axn->findAxis(axn,c)!==nothing,dls) && eltype(c)==majtype - push!(newkeys,k) - end - end - if length(newkeys)==1 - return ds.cubes[first(newkeys)] - else - varax = CategoricalAxis(joinname, string.(newkeys)) - return concatenateCubes([ds.cubes[k] for k in newkeys], varax) - end -end - - - -""" - function createdataset(DS::Type,axlist; kwargs...) - -Creates a new datacube with axes specified in `axlist`. Each axis must be a subtype -of `CubeAxis`. A new empty Zarr array will be created and can serve as a sink for -`mapCube` operations. - -### Keyword arguments - -* `folder=tempname()` location where the new cube is stored -* `T=Union{Float32,Missing}` data type of the target cube -* `chunksize = ntuple(i->length(axlist[i]),length(axlist))` chunk sizes of the array -* `chunkoffset = ntuple(i->0,length(axlist))` offsets of the chunks -* `persist::Bool=true` shall the disk data be garbage-collected when the cube goes out of scope? -* `overwrite::Bool=false` overwrite cube if it already exists -* `properties=Dict{String,Any}()` additional cube properties -* `fillvalue= T>:Missing ? defaultfillval(Base.nonmissingtype(T)) : nothing` fill value -* `datasetaxis="Variable"` special treatment of a categorical axis that gets written into separate zarr arrays -""" -function createdataset(DS, axlist; - path="", - persist = false, - T=Union{Float32,Missing}, - chunksize = ntuple(i->length(axlist[i]),length(axlist)), - chunkoffset = ntuple(i->0,length(axlist)), - overwrite::Bool=false, - properties=Dict{String,Any}(), - datasetaxis = "Variable", - kwargs... - ) - persist = !isempty(path) || persist - path = getsavefolder(path, persist) - check_overwrite(path, overwrite) - splice_generic(x::AbstractArray,i) = [x[1:(i-1)];x[(i+1:end)]] - splice_generic(x::Tuple,i) = (x[1:(i-1)]...,x[(i+1:end)]...) - myar = create_empty(DS, path) - if (iax = findAxis(datasetaxis,axlist)) !== nothing - groupaxis = axlist[iax] - axlist = splice_generic(axlist,iax) - chunksize = splice_generic(chunksize,iax) - chunkoffset = splice_generic(chunkoffset,iax) - else - groupaxis = nothing - end - foreach(axlist,chunkoffset) do ax,co - arrayfromaxis(myar,ax,co) - end - attr = properties - s = map(length,axlist) .+ chunkoffset - if all(iszero,chunkoffset) - subs = nothing - else - subs = ntuple(length(axlist)) do i - (chunkoffset[i]+1):(length(axlist[i].values)+chunkoffset[i]) - end - end - if groupaxis===nothing - cubenames = ["layer"] - else - cubenames = groupaxis.values - end - cleaner = CleanMe[] - persist || push!(cleaner,CleanMe(path,false)) - allcubes = map(cubenames) do cn - v = if allow_missings(myar) || !(T>:Missing) - add_var(myar, T, cn, map(length,axlist), map(axname,axlist), attr; chunksize = chunksize, kwargs...) - else - S = Base.nonmissingtype(T) - if !haskey(attr,"missing_value") - attr["missing_value"] = YAXArrayBase.defaultfillval(S) - end - v = add_var(myar, S, cn, map(length,axlist), map(axname,axlist), attr; chunksize = chunksize, kwargs...) - CFDiskArray(v,attr) - end - if subs !== nothing - za = view(za,subs...) - end - ESDLArray(axlist,v,propfromattr(attr),cleaner=cleaner) - end - if groupaxis===nothing - return allcubes[1] - else - return concatenateCubes(allcubes,groupaxis) - end -end - -function getsavefolder(name,persist) - if isempty(name) - name = persist ? split(tempname(),"/")[end] : tempname()[2:end] - joinpath(ESDLDefaults.workdir[],name) - else - occursin("/",name) ? name : joinpath(ESDLDefaults.workdir[],name) - end -end - -function check_overwrite(newfolder, overwrite) - if isdir(newfolder) || isfile(newfolder) - if overwrite - rm(newfolder, recursive=true) - else - error("$(newfolder) already exists, please pick another name or use `overwrite=true`") - end - end -end - -function arrayfromaxis(p,ax::CubeAxis,offs) - data, attr = dataattfromaxis(ax,offs) - attr["_ARRAY_OFFSET"]=offs - za = add_var(p, data, axname(ax), (axname(ax),), attr) - za -end - -prependrange(r::AbstractRange,n) = n==0 ? r : range(first(r)-n*step(r),last(r),length=n+length(r)) -function prependrange(r::AbstractVector,n) - if n==0 - return r - else - step = r[2]-r[1] - first = r[1] - step*n - last = r[1] - step - radd = range(first,last,length=n) - return [radd;r] - end -end - -defaultcal(::Type{<:TimeType}) = "standard" -defaultcal(::Type{<:DateTimeNoLeap}) = "noleap" -defaultcal(::Type{<:DateTimeAllLeap}) = "allleap" -defaultcal(::Type{<:DateTime360Day}) = "360_day" - -datetodatetime(vals::AbstractArray{<:Date}) = DateTime.(vals) -datetodatetime(vals) = vals - -function dataattfromaxis(ax::CubeAxis,n) - prependrange(ax.values,n), Dict{String,Any}() -end -# function dataattfromaxis(ax::CubeAxis,n) -# prependrange(1:length(ax.values),n), Dict{String,Any}("_ARRAYVALUES"=>collect(ax.values)) -# end -function dataattfromaxis(ax::CubeAxis{T},n) where T<:TimeType - data = timeencode(datetodatetime(ax.values),"days since 1980-01-01",defaultcal(T)) - prependrange(data,n), Dict{String,Any}("units"=>"days since 1980-01-01","calendar"=>defaultcal(T)) -end - -#The good old Cube function: -Cube(s::String;kwargs...) = Cube(open_dataset(s);kwargs...) -function Cube(;kwargs...) - if !isempty(ESDL.ESDLDefaults.cubedir[]) - Cube(ESDL.ESDLDefaults.cubedir[];kwargs...) - else - ESDC(;kwargs...) - end -end - -#Defining joins of Datasets -abstract type AxisJoin end -struct AllEqual <: AxisJoin - ax -end -struct SortedRanges <: AxisJoin - axlist - perm -end -blocksize(x::AllEqual) = 1 -blocksize(x::SortedRanges) = length(x.axlist) -getperminds(x::AllEqual) = 1:1 -getperminds(x::SortedRanges) = x.perm -wholeax(x::AllEqual) = x.ax -wholeax(x::SortedRanges) = reduce(vcat,x.axlist[x.perm]) -struct NewDim <: AxisJoin - newax -end -#Test for a range of categorical axes how to concatenate them -function analyse_axjoin_ranges(dimvallist) - firstax = first(dimvallist) - if all(isequal(firstax), dimvallist) - return AllEqual(firstax) - end - revorder = if all(issorted,dimvallist) - false - elseif all(i->issorted(i,rev=true),dimvallist) - true - else - error("Dimension values are not sorted") - end - function ltfunc(ax1,ax2) - min1,max1 = extrema(ax1) - min2,max2 = extrema(ax2) - if max1 < min2 - return true - elseif min1 > max2 - return false - else - error("Dimension ranges overlap") - end - end - sp = sortperm(dimvallist, rev = revorder, lt = ltfunc) - SortedRanges(dimvallist, sp) -end -using YAXArrayBase: YAXArrayBase, getdata, getattributes, yaxcreate -function create_mergedict(dimvallist) - allmerges = Dict{Symbol,Any}() - - for (axn,dimvals) in dimvallist - iscont = iscontdimval.(dimvals) - if all(iscont) - allmerges[axn] = analyse_axjoin_ranges(dimvals) - elseif any(iscont) - error("Mix of continous and non-continous values") - else - allmerges[axn] = analyse_axjoin_categorical(dimvals) - end - end - allmerges -end -function merge_datasets(dslist) - allaxnames = counter(Symbol) - for ds in dslist, k in keys(ds.axes) - push!(allaxnames,k) - end - dimvallist = Dict(ax=>map(i->i.axes[ax].values,dslist) for ax in keys(allaxnames)) - allmerges = create_mergedict(dimvallist) - repvars = counter(Symbol) - for ds in dslist, v in keys(ds.cubes) - push!(repvars,v) - end - tomergevars = filter(i->i[2]==length(dslist), repvars) - mergedvars=Dict{Symbol,Any}() - for v in keys(tomergevars) - dn = YAXArrayBase.dimnames(first(dslist)[v]) - howmerge = getindex.(Ref(allmerges), dn) - sizeblockar = map(blocksize,howmerge) - perminds = map(getperminds,howmerge) - @assert length(dslist) == prod(sizeblockar) - vcol = map(i->getdata(i[v]), dslist) - allatts = mapreduce(i->getattributes(i[v]),merge,dslist, init=Dict{String,Any}()) - aa = [vcol[i] for (i,_) in enumerate(Iterators.product(perminds...))] - dvals = map(wholeax, howmerge) - mergedvars[v] = yaxcreate(ESDLArray, ConcatDiskArray(aa), dn, dvals, allatts) - end - Dataset(;mergedvars...) -end - - -end diff --git a/src/ESDL.jl b/src/ESDL.jl index d3b35a20..b88cb511 100644 --- a/src/ESDL.jl +++ b/src/ESDL.jl @@ -5,58 +5,35 @@ Some info on the project... """ module ESDL -global const ESDLDefaults = ( - workdir = Ref("./"), - recal = Ref(false), - chunksize = Ref{Any}(:input), - max_cache = Ref(1e8), - cubedir = Ref(""), - subsetextensions = [], -) -global const workdir=ESDLDefaults.workdir -global const recal=ESDLDefaults.recal -function __init__() - ESDLDefaults.workdir[] = get(ENV,"ESDL_WORKDIR","./") - ESDLDefaults.max_cache[] = parse(Float64,get(ENV,"ESDL_MAX_CACHE","100")) * 1e6 - ESDLDefaults.cubedir[] = get(ENV,"ESDL_CUBEDIR","") -end -ESDLdir(x::String)=ESDLDefaults.workdir[]=x -recalculate(x::Bool)=ESDLDefaults.recal[]=x -recalculate()=ESDLDefaults.recal[] -ESDLdir()=ESDLDefaults.workdir[] -export ESDLdir -include("ESDLTools.jl") -include("Cubes/Cubes.jl") -include("DatasetAPI/Datasets.jl") -include("DAT/DAT.jl") -include("Proc/Proc.jl") +include("Proc.jl") +include("countrydict.jl") +include("esdc.jl") -using .ESDLTools: @reexport +using YAXArrays.YAXTools: @reexport @reexport using Dates: Date, DateTime -@reexport using IntervalSets: (..) -@reexport using .Cubes: cubeinfo, concatenateCubes, caxes, - subsetcube, readcubedata,renameaxis!, ESDLArray -@reexport using .Cubes.Axes: CubeAxis, RangeAxis, CategoricalAxis, +@reexport using YAXArrays: (..) +@reexport using YAXArrays: concatenatecubes, caxes, + subsetcube, readcubedata,renameaxis!, YAXArray +@reexport using YAXArrays: CubeAxis, RangeAxis, CategoricalAxis, getAxis -@reexport using .DAT: mapCube, getAxis, InDims, OutDims, Dataset, - CubeTable, cubefittable, fittable #From DAT module +@reexport using YAXArrays: mapCube, getAxis, InDims, OutDims, Dataset, + CubeTable, cubefittable, fittable, savecube, Cube, open_dataset #From DAT module @reexport using .Proc: removeMSC, gapFillMSC,normalizeTS, - getMSC, filterTSFFT, getNpY,savecube,loadcube,rmcube, + getMSC, filterTSFFT, getNpY, getMedSC, extractLonLats, cubefromshape, gapfillpoly, spatialinterp #From Proc module -@reexport using .Datasets: Dataset, Cube, open_dataset -@reexport using .ESDLTools: @loadOrGenerate # from ESDL Tools +@reexport using YAXArrays: @loadOrGenerate # from ESDL Tools + +@reexport using .ESDC: esdc, esdd -@deprecate saveCube(data, filename) savecube(data,filename) -@deprecate loadCube(filename) loadcube(filename) -@deprecate rmCube(filename) rmcube(filename) -@deprecate exportcube(data, filename; kwargs...) savecube(data, filename; backend=:netcdf, kwargs...) -@deprecate cubeproperties(x) getattributes(x) +include("cubeinfo.jl") -#include("precompile.jl") -#_precompile_() +using YAXArrays: YAXDefaults +function __init__() + push!(YAXDefaults.subsetextensions, replaceregion) +end end # module diff --git a/src/Proc/MSC.jl b/src/MSC.jl similarity index 85% rename from src/Proc/MSC.jl rename to src/MSC.jl index a8800c1e..3623bafe 100644 --- a/src/Proc/MSC.jl +++ b/src/MSC.jl @@ -1,4 +1,4 @@ -using Polynomials: polyfit +using Polynomials: fit using Statistics: quantile! function removeMSC(aout,ain,NpY::Integer) @@ -10,7 +10,7 @@ function removeMSC(aout,ain,NpY::Integer) end """ - removeMSC(c::AbstractCubeData) + removeMSC(c) Removes the mean annual cycle from each time series of a data cube. @@ -18,7 +18,7 @@ Removes the mean annual cycle from each time series of a data cube. **Output Axis** `Time`axis """ -function removeMSC(c::AbstractCubeData;kwargs...) +function removeMSC(c;kwargs...) NpY = getNpY(c) mapCube( removeMSC, @@ -31,7 +31,7 @@ function removeMSC(c::AbstractCubeData;kwargs...) end """ - gapFillMSC(c::AbstractCubeData;complete_msc=false) + gapFillMSC(c;complete_msc=false) Fills missing values of each time series in a cube with the mean annual cycle. If `complete_msc` is set to `true`, the MSC will be filled with a polynomial @@ -41,7 +41,7 @@ in case it still contains missing values due to systematic gaps. **Output Axis** `Time`axis """ -function gapFillMSC(c::AbstractCubeData;kwargs...) +function gapFillMSC(c;kwargs...) NpY=getNpY(c) mapCube(gapFillMSC,c,NpY;indims=InDims("Time"),outdims=OutDims("Time"),kwargs...) end @@ -58,12 +58,12 @@ end function fill_msc_poly!(tmsc) mscrep = [tmsc;tmsc;tmsc] n = length(tmsc) - a = gapfillpoly(mscrep, max_gap = n÷2, nbefore_after = max(3,n÷30)) + a = gapfillpoly!(mscrep, max_gap = n÷2, nbefore_after = max(3,n÷30)) tmsc .= view(a,(n+1):(n+n)) end -gapfillpoly(x::AbstractCubeData;max_gap=30,nbefore_after=10, polyorder = 2) = - mapslices(gapfillpoly,x,dims="Time",max_gap=max_gap, nbefore_after=nbefore_after, polyorder=polyorder) +gapfillpoly(x;max_gap=30,nbefore_after=10, polyorder = 2) = + mapslices(gapfillpoly!,x,dims="Time",max_gap=max_gap, nbefore_after=nbefore_after, polyorder=polyorder) """ fillgaps_poly(x;max_gap=30,nbefore_after=10, polyorder = 2) @@ -73,7 +73,7 @@ the algorithm uses `nbefore_after` time steps before and after the gap to fit a polynomial of order `polyorder`. The missing alues are then replaced by the fitted polynomial. """ -function gapfillpoly(x;max_gap=30,nbefore_after=10, polyorder = 2) +function gapfillpoly!(x;max_gap=30,nbefore_after=10, polyorder = 2) x = replace(i->(!ismissing(i) && isfinite(i)) ? i : missing,x) a = copy(x) workx = Float64[] @@ -98,7 +98,7 @@ function gapfillpoly(x;max_gap=30,nbefore_after=10, polyorder = 2) end end if length(workx)>polyorder - p = polyfit(workx,worky,polyorder) + p = fit(workx,worky,polyorder) for idx in gapstart:(gapstop-1) a[idx] = p(idx) end @@ -113,7 +113,7 @@ end """ - getMSC(c::AbstractCubeData) + getMSC(c) Returns the mean annual cycle from each time series. @@ -122,13 +122,17 @@ Returns the mean annual cycle from each time series. **Output Axis** `MSC`axis """ -function getMSC(c::AbstractCubeData;kwargs...) +function getMSC(c;kwargs...) N = getNpY(c) - outdims = OutDims(RangeAxis("MSC",DateTime(1900):Day(ceil(Int,366/N)):DateTime(1900,12,31,23,59,59))) + outdims = OutDims(RangeAxis("MSC",DateTime(1900):Day(ceil(Int,366/N)):DateTime(1900,12,31,23,59,59)), + outtype = mscouttype(eltype(c))) indims = InDims("Time") mapCube(getMSC,c,getNpY(c);indims=indims,outdims=outdims,kwargs...) end +mscouttype(T) = Base.nonmissingtype(T) +mscouttype(::Type{<:Union{Missing,Integer}}) = Float64 + function getMSC(aout::AbstractVector,ain::AbstractVector,NpY;imscstart::Int=1) nmsc = zeros(Int,NpY) fillmsc(imscstart,aout,nmsc,ain,NpY) @@ -160,7 +164,7 @@ function replaceMisswithMSC(msc::AbstractVector,xin::AbstractArray,xout::Abstrac end """ - getMedMSC(c::AbstractCubeData) + getMedMSC(c) Returns the median annual cycle from each time series. @@ -168,9 +172,10 @@ Returns the median annual cycle from each time series. **Output Axis** `MSC`axis """ -function getMedSC(c::AbstractCubeData;kwargs...) +function getMedSC(c;kwargs...) N = getNpY(c) - outdims = OutDims(RangeAxis("MSC",DateTime(1900):Day(ceil(Int,366/N)):DateTime(1900,12,31,23,59,59))) + outdims = OutDims(RangeAxis("MSC",DateTime(1900):Day(ceil(Int,366/N)):DateTime(1900,12,31,23,59,59)), + outtype = mscouttype(eltype(c))) indims = InDims("Time") mapCube(getMedSC,c;indims=indims,outdims=outdims,kwargs...) end diff --git a/src/Proc/Proc.jl b/src/Proc.jl similarity index 58% rename from src/Proc/Proc.jl rename to src/Proc.jl index 3b7db70f..1304c67c 100644 --- a/src/Proc/Proc.jl +++ b/src/Proc.jl @@ -1,17 +1,16 @@ module Proc -using ..Cubes: ESDLArray, AbstractCubeData, cubechunks, caxes -using ..Cubes.Axes: getAxis, findAxis, CategoricalAxis, axVal2Index, +using YAXArrays.Cubes: YAXArray, cubechunks, caxes +using YAXArrays.Cubes.Axes: getAxis, findAxis, CategoricalAxis, axVal2Index, RangeAxis, get_bb, axisfrombb, CubeAxis, axname -using ..DAT: mapCube, InDims, OutDims, NValid, AnyMissing -using ..Datasets: getsavefolder, Cube +using YAXArrays.DAT: mapCube, InDims, OutDims, NValid, AnyMissing +using YAXArrays.Datasets: getsavefolder, Cube using Dates: year, Date, DateTime """ - getNpY(cube::AbstractCubeData) - getNpY(cube::InputCube) + getNpY(cube) Get the number of time steps per year """ -function getNpY(cube::AbstractCubeData) +function getNpY(cube) timax = getAxis("Time",cube) years = year.(timax.values) years[end] > years[1] + 1 || error("Must have at least 3 years to estimate number of time steps per year") @@ -20,9 +19,9 @@ end include("MSC.jl") include("Stats.jl") -include("CubeIO.jl") include("TSDecomposition.jl") include("remap.jl") include("Shapes.jl") +include("extractlonlats.jl") end diff --git a/src/Proc/CubeIO.jl b/src/Proc/CubeIO.jl deleted file mode 100644 index 79522031..00000000 --- a/src/Proc/CubeIO.jl +++ /dev/null @@ -1,145 +0,0 @@ -using Base.Iterators: Iterators, product -using DataStructures: OrderedDict - -""" - savecube(cube,name::String) - -Save a [`ESDLArray`](@ref) to the folder `name` in the ESDL working directory. -""" -function savecube(c::AbstractCubeData, name::AbstractString; - chunksize = Dict(), - max_cache = 1e8, - backend = :zarr, - backendargs... -) -allax = caxes(c) -if !isa(chunksize,AbstractDict) && !isa(chunksize,NamedTuple) - @warn "Chunksize must be provided as a Dict mapping axis names to chunk size in the future" - chunksize = OrderedDict(axname(i[1])=>i[2] for i in zip(allax,chunksize)) -end - firstaxes = sort!([findAxis(String(k),allax) for k in keys(chunksize)]) - lastaxes = setdiff(1:length(allax),firstaxes) - allax = allax[[firstaxes;lastaxes]] - dl = cumprod(length.(caxes(c))) - isplit = findfirst(i->i>max_cache/sizeof(eltype(c)),dl) - isplit isa Nothing && (isplit=length(dl)+1) - forcesingle = (isplit+1)1 && println("Forcing single core processing because of bad chunk size") - o = mapCube(cop,c,indims=indims, outdims=outdims,ispar=false,max_cache=max_cache) - else - o = mapCube(cop,c,indims=indims, outdims=outdims,max_cache=max_cache) - end -end - -function loadcube(s) - Cube(getsavefolder(s, true)) -end - -function rmcube(s) - p = getsavefolder(s, true) - if isfile(p) || isdir(p) - rm(p, recursive=true) - end -end - - -function toPointAxis(aout,ain,loninds,latinds) - iout = 1 - for (ilon,ilat) in zip(loninds,latinds) - aout[iout]=ain[ilon,ilat] - iout+=1 - end -end - -""" - extractLonLats(c::AbstractCubeData,pl::Matrix) - -Extracts a list of longitude/latitude coordinates from a data cube. The coordinates -are specified through the matrix `pl` where `size(pl)==(N,2)` and N is the number -of extracted coordinates. Returns a data cube without `LonAxis` and `LatAxis` but with a -`SpatialPointAxis` containing the input locations. -""" -function extractLonLats(c::AbstractCubeData,pl::Matrix;kwargs...) - size(pl,2)==2 || error("Coordinate list must have exactly 2 columns") - axlist=caxes(c) - ilon=findAxis("Lon",axlist) - ilat=findAxis("Lat",axlist) - ilon>0 || error("Input cube must contain a LonAxis") - ilat>0 || error("input cube must contain a LatAxis") - lonax=axlist[ilon] - latax=axlist[ilat] - pointax = CategoricalAxis("SpatialPoint", [(pl[i,1],pl[i,2]) for i in 1:size(pl,1)]) - loninds = map(ll->axVal2Index(lonax,ll[1]),pointax.values) - latinds = map(ll->axVal2Index(latax,ll[2]),pointax.values) - incubes=InDims("Lon","Lat") - outcubes=OutDims(pointax) - y=mapCube(toPointAxis,c,loninds,latinds;indims=incubes,outdims=outcubes,max_cache=1e8,kwargs...) -end - -function writefun(xout,xin::AbstractArray{Union{Missing,T}},a,nd,cont_loop,filename) where T - - #TODO replace -9999.0 this with a default missing value - x = map(ix->ismissing(ix) ? convert(T,-9999.0) : ix,xin) - - count_vec = fill(-1,nd) - start_vec = fill(1,nd) - - used_syms = Symbol[] - for (k,v) in cont_loop - count_vec[k] = 1 - start_vec[k] = a[Symbol(v)][1] - push!(used_syms,Symbol(v)) - end - - splinds = Iterators.filter(i->!in(i,used_syms),keys(a)) - vn = join(string.([a[s][2] for s in splinds]),"_") - isempty(vn) && (vn="layer") - ncwrite(x,filename,vn,start=start_vec,count=count_vec) -end - -global const projection = Dict( -"grid_mapping_name" => "transverse_mercator", -"longitude_of_central_meridian" => -9. , -"false_easting" => 500000. , -"false_northing" => 0. , -"latitude_of_projection_origin" => 0. , -"scale_factor_at_central_meridian" => 0.9996, -"long_name" => "CRS definition", -"longitude_of_prime_meridian" => 0., -"semi_major_axis" => 6378137., -"inverse_flattening" => 298.257223563 , -"spatial_ref" => "PROJCS[\"WGS 84 / UTM zone 29N\",GEOGCS[\"WGS 84\", -DATUM[\"WGS_1984\",SPHEROID[\"WGS 84\",6378137,298.257223563, -AUTHORITY[\"EPSG\",\"7030\"]],AUTHORITY[\"EPSG\",\"6326\"]], -PRIMEM[\"Greenwich\",0,AUTHORITY[\"EPSG\",\"8901\"]], -UNIT[\"degree\",0.0174532925199433,AUTHORITY[\"EPSG\",\"9122\"]], -AUTHORITY[\"EPSG\",\"4326\"]], -PROJECTION[\"Transverse_Mercator\"], -PARAMETER[\"latitude_of_origin\",0], -PARAMETER[\"central_meridian\",-9], -PARAMETER[\"scale_factor\",0.9996], -PARAMETER[\"false_easting\",500000], -PARAMETER[\"false_northing\",0], -UNIT[\"metre\",1,AUTHORITY[\"EPSG\",\"9001\"]], -AXIS[\"Easting\",EAST], -AXIS[\"Northing\",NORTH], -AUTHORITY[\"EPSG\",\"32629\"]]", -"GeoTransform" => "722576.2320803495 20 0 4115483.464603715 0 -20 ") - -global const epsg4326=Dict( - "grid_mapping_name" => "latitude_longitude", - "long_name" => "CRS definition", - "longitude_of_prime_meridian" => 0., - "semi_major_axis" => 6378137., - "inverse_flattening" => 298.257223563, - "spatial_ref" => "GEOGCS[\"WGS 84\",DATUM[\"WGS_1984\",SPHEROID[\"WGS 84\",6378137,298.257223563,AUTHORITY[\"EPSG\",\"7030\"]],AUTHORITY[\"EPSG\",\"6326\"]],PRIMEM[\"Greenwich\",0],UNIT[\"degree\",0.0174532925199433],AUTHORITY[\"EPSG\",\"4326\"]]", - "GeoTransform" => "-98.68712696894481 0.0001796630568239077 0 20.69179551612753 0 -0.0001796630568239077 " -) diff --git a/src/Proc/Shapes.jl b/src/Shapes.jl similarity index 95% rename from src/Proc/Shapes.jl rename to src/Shapes.jl index b7558bf5..ac17f31b 100644 --- a/src/Proc/Shapes.jl +++ b/src/Shapes.jl @@ -44,7 +44,7 @@ function cubefromshape_fraction(shapepath,lonaxis,lataxis;labelsym=nothing, T=Fl newax = CategoricalAxis(labelsym,[pp[l] for l in labelsleft]) end - return ESDLArray(CubeAxis[lonaxis, lataxis, newax], allout) + return YAXArray(CubeAxis[lonaxis, lataxis, newax], allout) end @@ -59,7 +59,7 @@ function cubefromshape_single(shapepath, lonaxis, lataxis; labelsym = nothing, T labelsleft = collect(skipmissing(unique(outmat))) properties = getlabeldict(shapepath,labelsym,T,labelsleft) - return ESDLArray(CubeAxis[lonaxis, lataxis], outmat,properties) + return YAXArray(CubeAxis[lonaxis, lataxis], outmat,properties) end """ @@ -71,7 +71,7 @@ If the shapefile comes with additional labels in a .dbf file, the labels can be providing the label to choose as a symbol. If a `samplefactor` is supplied, the polygons will be rasterized on an n-times higher resolution and the fraction of area within each polygon will be returned. """ -cubefromshape(shapepath, c::AbstractCubeData; kwargs...) = cubefromshape(shapepath, getAxis("Lon",c), getAxis("Lat",c);kwargs...) +cubefromshape(shapepath, c; kwargs...) = cubefromshape(shapepath, getAxis("Lon",c), getAxis("Lat",c);kwargs...) function cubefromshape(args...; samplefactor=nothing, kwargs...) if samplefactor===nothing cubefromshape_single(args...; kwargs...) @@ -80,7 +80,7 @@ function cubefromshape(args...; samplefactor=nothing, kwargs...) end end -function prune_labels!(c::ESDLArray) +function prune_labels!(c::YAXArray) if haskey(c.properties,"labels") labelsleft = Set(skipmissing(unique(c.data))) dold = c.properties["labels"] diff --git a/src/Proc/Stats.jl b/src/Stats.jl similarity index 85% rename from src/Proc/Stats.jl rename to src/Stats.jl index f919e008..0234aee5 100644 --- a/src/Proc/Stats.jl +++ b/src/Stats.jl @@ -5,7 +5,7 @@ using WeightedOnlineStats: WeightedAdaptiveHist """ - normalizeTS(c::AbstractCubeData) + normalizeTS(c) Normalize a time series to zero mean and unit variance @@ -13,7 +13,7 @@ Normalize a time series to zero mean and unit variance **Output Axes** `TimeAxis` """ -function normalizeTS(c::AbstractCubeData;kwargs...) +function normalizeTS(c;kwargs...) mapCube(normalizeTS,c;indims=InDims("Time",filter=NValid(3)),outdims=OutDims("Time"),kwargs...) end function normalizeTS(xout::AbstractVector,xin::AbstractVector) @@ -24,12 +24,12 @@ function normalizeTS(xout::AbstractVector,xin::AbstractVector) end """ - quantile(c::AbstractCubeData,p=[0.25,0.5,0.75];by=(),nbins=100) + quantile(c,p=[0.25,0.5,0.75];by=(),nbins=100) Computes the quantile of a data cube based on fitting a Histogram on the data using an Online statistic. """ -function quantile(c::AbstractCubeData,p=[0.25,0.5,0.75];by=(),nbins=100) +function quantile(c::YAXArray,p=[0.25,0.5,0.75];by=(),nbins=100) if any(i->isa(i,CategoricalAxis{<:Any,:Hist}),caxes(c)) && any(i->isa(i,RangeAxis{<:Any,:Bin}),caxes(c)) if isa(p,Number) od = OutDims() diff --git a/src/Proc/TSDecomposition.jl b/src/TSDecomposition.jl similarity index 96% rename from src/Proc/TSDecomposition.jl rename to src/TSDecomposition.jl index 4996a6f3..1b022772 100644 --- a/src/Proc/TSDecomposition.jl +++ b/src/TSDecomposition.jl @@ -23,7 +23,7 @@ end mirror(i,l)=l-i+2 """ - filterTSFFT(c::AbstractCubeData) + filterTSFFT(c) Filter each time series using a Fourier filter and return the decomposed series in 4 time windows (Trend, Long-Term Variability, Annual Cycle, Fast Oscillations) @@ -33,7 +33,7 @@ in 4 time windows (Trend, Long-Term Variability, Annual Cycle, Fast Oscillations **Output Axes** `Time`axis, `Scale`axis """ -function filterTSFFT(c::AbstractCubeData;kwargs...) +function filterTSFFT(c;kwargs...) indims = InDims(TimeAxis,filter=AnyMissing()) outdims = OutDims(TimeAxis,(c,p)->ScaleAxis(["Trend", "Long-Term Variability", "Annual Cycle", "Fast Oscillations"])) ntime = length(getAxis("Time",c)) diff --git a/src/DatasetAPI/countrydict.jl b/src/countrydict.jl similarity index 99% rename from src/DatasetAPI/countrydict.jl rename to src/countrydict.jl index 9739ae13..2b2cea0a 100644 --- a/src/DatasetAPI/countrydict.jl +++ b/src/countrydict.jl @@ -306,7 +306,7 @@ |ZMB|Zambia |ZWE|Zimbabwe """ -const known_regions = Dict{String,NTuple{4,Float64}}( +known_regions = Dict{String,NTuple{4,Float64}}( "World"=>(-180.0,-90.0,179.999999999999,90.0), "Africa"=>(-17.0,-40.0,51.0,40.0), "Asia"=>(30.0,5.0,179.5,80.0), @@ -885,9 +885,6 @@ const known_regions = Dict{String,NTuple{4,Float64}}( "Zimbabwe" => (25.219369751374444,-22.397339782252487,33.0427681886182,-15.614808044416833), "ZWE" => (25.219369751374444,-22.397339782252487,33.0427681886182,-15.614808044416833), ) - -using ..ESDL: ESDLDefaults - function replaceregion(kwargs) if haskey(kwargs,:region) reg = kwargs[:region] @@ -898,5 +895,3 @@ function replaceregion(kwargs) kwargs[:lat] = lat1..lat2 end end - -push!(ESDLDefaults.subsetextensions, replaceregion) diff --git a/src/cubeinfo.jl b/src/cubeinfo.jl new file mode 100644 index 00000000..af190a27 --- /dev/null +++ b/src/cubeinfo.jl @@ -0,0 +1,54 @@ +using Markdown +struct YAXVarInfo + project::String + longname::String + units::String + url::String + comment::String + reference::String +end +Base.isless(a::YAXVarInfo, b::YAXVarInfo) = isless(string(a.project, a.longname),string(b.project, b.longname)) + +import Base.show +function show(io::IO,::MIME"text/markdown",v::YAXVarInfo) + un=v.units + url=v.url + re=v.reference + pr = v.project + ln = v.longname + co = v.comment + mdt=md""" +### $ln +*$(co)* + +* **Project** $(pr) +* **units** $(un) +* **Link** $(url) +* **Reference** $(re) +""" + mdt[3].items[1][1].content[3]=[" $pr"] + mdt[3].items[2][1].content[3]=[" $un"] + mdt[3].items[3][1].content[3]=[" $url"] + mdt[3].items[4][1].content[3]=[" $re"] + show(io,MIME"text/markdown"(),mdt) +end +show(io::IO,::MIME"text/markdown",v::Vector{YAXVarInfo})=foreach(x->show(io,MIME"text/markdown"(),x),v) + +""" + cubeinfo(cube) + +Shows the metadata and citation information on variables contained in a cube. +""" +function cubeinfo(ds::YAXArray, variable="unknown") + p = ds.properties + vi=YAXVarInfo( + get(p,"project_name", "unknown"), + get(p,"long_name",variable), + get(p,"units","unknown"), + get(p,"url","no link"), + get(p,"comment",variable), + get(p,"references","no reference") + ) +end + +export cubeinfo diff --git a/src/esdc.jl b/src/esdc.jl new file mode 100644 index 00000000..898559f7 --- /dev/null +++ b/src/esdc.jl @@ -0,0 +1,67 @@ +module ESDC +import YAXArrays.Datasets: Dataset, Cube, open_dataset +using Zarr: S3Store, zopen, aws_config +using Dates: Dates, now +export esdc, esdd +global aws, cubesdict + +function __init__() + global aws, cubesdict + aws = aws_config(creds=nothing, region="eu-de", service_name="obs", service_host="otc.t-systems.com") + cubesdict = Dict( + ("low","ts","global") => ("obs-esdc-v2.0.0","esdc-8d-0.25deg-184x90x90-2.0.0.zarr"), + ("low","map","global") => ("obs-esdc-v2.0.0","esdc-8d-0.25deg-1x720x1440-2.0.0.zarr"), + ("high","ts","global") => ("obs-esdc-v2.0.0","esdc-8d-0.083deg-184x270x270-2.0.0.zarr"), + ("high","map","global") => ("obs-esdc-v2.0.0","esdc-8d-0.083deg-1x2160x4320-2.0.0.zarr"), + ("low","ts","Colombia") => ("obs-esdc-v2.0.1","Cube_2019lowColombiaCube_184x60x60.zarr"), + ("low","map","Colombia") => ("obs-esdc-v2.0.1","Cube_2019lowColombiaCube_1x336x276.zarr/"), + ("high","ts","Colombia") => ("obs-esdc-v2.0.1","Cube_2019highColombiaCube_184x120x120.zarr"), + ("high","map","Colombia") => ("obs-esdc-v2.0.1","Cube_2019highColombiaCube_1x3360x2760.zarr"), + ) + if isdir("/home/jovyan/work/datacube/ESDCv2.0.0/esdc-8d-0.25deg-184x90x90-2.0.0.zarr/") + YAXArrays.YAXDefaults.cubedir[] = "/home/jovyan/work/datacube/ESDCv2.0.0/esdc-8d-0.25deg-184x90x90-2.0.0.zarr/" + end +end + +""" + function esdd(;kwargs...) + +Opens a datacube from the Telecom Object Storage Service as a Dataset. This works on any system, but +might involve some latency depending on connection speed. One can either specify a `bucket` +and `store` or pick a resolution, chunking and cube region. + +### Keyword arguments + + * `bucket=nothing` specify an OBS bucket for example "obs-esdc-v2.0.0" + * `store=""` specify the root path of the cube, for example "esdc-8d-0.25deg-184x90x90-2.0.0.zarr" + * `res="low"` pick a datacube resolution (`"low"` or `"high"`) + * `chunks="ts"` choose a chunking (`"ts"` for time series access or `"map"` for spatial analyses) + * `region="global"` choose a datacube (either `"global"` or `"Colombia"`) + +""" +function esdd(;bucket=nothing, store="", res="low", chunks="ts", region="global") + if bucket===nothing + bucket, store = cubesdict[(res,chunks,region)] + end + open_dataset(zopen(S3Store(bucket,store,2,aws),consolidated=true)) +end + +""" + function esdc(;kwargs...) + +Opens a datacube from the Telecom Object Storage Service as a Dataset. This works on any system, but +might involve some latency depending on connection speed. One can either specify a `bucket` +and `store` or pick a resolution, chunking and cube region. + +### Keyword arguments + + * `bucket=nothing` specify an OBS bucket for example "obs-esdc-v2.0.0" + * `store=""` specify the root path of the cube, for example "esdc-8d-0.25deg-184x90x90-2.0.0.zarr" + * `res="low"` pick a datacube resolution (`"low"` or `"high"`) + * `chunks="ts"` choose a chunking (`"ts"` for time series access or `"map"` for spatial analyses) + * `region="global"` choose a datacube (either `"global"` or `"Colombia"`) + +""" +esdc(;kwargs...) = Cube(esdd(;kwargs...)) + +end # module diff --git a/src/extractlonlats.jl b/src/extractlonlats.jl new file mode 100644 index 00000000..bf9144db --- /dev/null +++ b/src/extractlonlats.jl @@ -0,0 +1,32 @@ +function toPointAxis(aout,ain,loninds,latinds) + iout = 1 + for (ilon,ilat) in zip(loninds,latinds) + aout[iout]=ain[ilon,ilat] + iout+=1 + end +end + +""" + extractLonLats(c,pl::Matrix) + +Extracts a list of longitude/latitude coordinates from a data cube. The coordinates +are specified through the matrix `pl` where `size(pl)==(N,2)` and N is the number +of extracted coordinates. Returns a data cube without `LonAxis` and `LatAxis` but with a +`SpatialPointAxis` containing the input locations. +""" +function extractLonLats(c,pl::Matrix;kwargs...) + size(pl,2)==2 || error("Coordinate list must have exactly 2 columns") + axlist=caxes(c) + ilon=findAxis("Lon",axlist) + ilat=findAxis("Lat",axlist) + ilon>0 || error("Input cube must contain a LonAxis") + ilat>0 || error("input cube must contain a LatAxis") + lonax=axlist[ilon] + latax=axlist[ilat] + pointax = CategoricalAxis("SpatialPoint", [(pl[i,1],pl[i,2]) for i in 1:size(pl,1)]) + loninds = map(ll->axVal2Index(lonax,ll[1]),pointax.values) + latinds = map(ll->axVal2Index(latax,ll[2]),pointax.values) + incubes=InDims("Lon","Lat") + outcubes=OutDims(pointax) + y=mapCube(toPointAxis,c,loninds,latinds;indims=incubes,outdims=outcubes,max_cache=1e8,kwargs...) +end diff --git a/src/Proc/remap.jl b/src/remap.jl similarity index 84% rename from src/Proc/remap.jl rename to src/remap.jl index 64c7c18b..bb0f882e 100644 --- a/src/Proc/remap.jl +++ b/src/remap.jl @@ -2,12 +2,12 @@ using DiskArrayTools: InterpolatedDiskArray using Interpolations: Linear, Flat, Constant, NoInterp using DiskArrays: eachchunk, GridChunks """ - spatialinterp(c::AbstractCubeData,newlons::AbstractRange,newlats::AbstractRange;order=Linear(),bc = Flat()) + spatialinterp(c,newlons::AbstractRange,newlats::AbstractRange;order=Linear(),bc = Flat()) """ -function spatialinterp(c::AbstractCubeData,newlons::AbstractRange,newlats::AbstractRange;order=Linear(),bc = Flat()) +function spatialinterp(c,newlons::AbstractRange,newlats::AbstractRange;order=Linear(),bc = Flat()) interpolatecube(c,Dict("Lon"=>newlons, "Lat"=>newlats), order = Dict("Lon"=>Linear(),"Lat"=>Linear())) end -spatialinterp(c::AbstractCubeData,newlons::CubeAxis,newlats::CubeAxis;kwargs...)= +spatialinterp(c,newlons::CubeAxis,newlats::CubeAxis;kwargs...)= spatialinterp(c,newlons.values,newlats.values;kwargs...) function expandchunksizes(c,newaxdict) @@ -28,7 +28,7 @@ spatialinterp(c::AbstractCubeData,newlons::CubeAxis,newlats::CubeAxis;kwargs...) newchunks = GridChunks(size(c),newcs, offset = newco) end -function interpolatecube(c::AbstractCubeData, +function interpolatecube(c, newaxes::Dict, newchunks = expandchunksizes(c,newaxes); order=Dict() @@ -55,7 +55,7 @@ function interpolatecube(c::AbstractCubeData, RangeAxis(axname(ax),val) end end - ESDLArray(newax,ar,c.properties, cleaner = c.cleaner) + YAXArray(newax,ar,c.properties, cleaner = c.cleaner) end function getinterpinds(oldvals::AbstractRange, newvals::AbstractRange) diff --git a/test/Project.toml b/test/Project.toml new file mode 100644 index 00000000..35be4603 --- /dev/null +++ b/test/Project.toml @@ -0,0 +1,16 @@ +[deps] +CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" +DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" +Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" +DiskArrayTools = "fcd2136c-9f69-4db6-97e5-f31981721d63" +Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" +ESDL = "359177bc-a543-11e8-11b7-bb015dba3358" +IterTools = "c8e1da08-722c-5040-9ed9-7db0dc04731e" +NetCDF = "30363a11-5582-574a-97bb-aa9a979735b9" +OnlineStats = "a15396b6-48d5-5d58-9928-6d29437db91e" +RDatasets = "ce6b1742-4840-55fa-b093-852dadbb1d8b" +Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2" +StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" +WeightedOnlineStats = "bbac0a1f-7c9d-5672-960b-c6ca726e5d5d" +YAXArrays = "c21b50f5-aa40-41ea-b809-c0f5e47bfa5c" diff --git a/test/access.jl b/test/access.jl index 7f34c369..590d9efe 100644 --- a/test/access.jl +++ b/test/access.jl @@ -46,7 +46,7 @@ data2=readcubedata(d2) @test caxes(data1)[1:2]==CubeAxis[RangeAxis("lon",10.125:0.25:10.875),RangeAxis("lat",50.875:-0.25:50.125)] tax = caxes(data1)[3] -@test ESDL.Cubes.Axes.axsym(tax)==:time +@test YAXArrays.Cubes.Axes.axsym(tax)==:time @test tax.values[1] == Date(2002,1,5) @test tax.values[end] == Date(2008,12,30) @test length(tax.values) == 7*46 @@ -54,7 +54,7 @@ tax = caxes(data1)[3] @test caxes(data2)[[1,2,4]]==CubeAxis[RangeAxis("lon",10.125:0.25:10.875),RangeAxis("lat",50.875:-0.25:50.125),CategoricalAxis("Variable",["air_temperature_2m","gross_primary_productivity"])] tax = caxes(data2)[3] -@test ESDL.Cubes.Axes.axsym(tax)==:time +@test YAXArrays.Cubes.Axes.axsym(tax)==:time @test tax.values[1] == Date(2002,1,5) @test tax.values[end] == Date(2008,12,30) @test length(tax.values) == 7*46 @@ -87,7 +87,7 @@ using DiskArrayTools: DiskArrayStack data1=readcubedata(d) #Test saving cubes dire=tempname() - ESDLdir(dire) + YAXdir(dire) savecube(data1,"mySavedCube") diff --git a/test/analysis.jl b/test/analysis.jl index d3fa8cdf..1f2477f7 100644 --- a/test/analysis.jl +++ b/test/analysis.jl @@ -5,7 +5,7 @@ import Base.Iterators using Distributed using Statistics addprocs(2) -@everywhere using ESDL, Statistics, ESDC, NetCDF +@everywhere using ESDL, Statistics, NetCDF @everywhere function sub_and_return_mean(xout1,xout2,xin) m=mean(skipmissing(xin)) diff --git a/test/runtests.jl b/test/runtests.jl index a8cc39e0..2043c87c 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,8 +1,8 @@ -using ESDL, ESDC, NetCDF +using ESDL, NetCDF, YAXArrays using Test newcubedir = mktempdir() -ESDLdir(newcubedir) +YAXdir(newcubedir) # Download Cube subset c = esdc() @@ -13,7 +13,7 @@ cgermany = c[ ] savecube(cgermany,"germanycube", chunksize=Dict("lon"=>20,"lat"=>20,"time"=>92)) -ESDL.ESDLDefaults.cubedir[] = joinpath(newcubedir,"germanycube") +YAXArrays.YAXDefaults.cubedir[] = joinpath(newcubedir,"germanycube") include("access.jl") include("analysis.jl") #include("artype.jl") diff --git a/test/table.jl b/test/table.jl index a155b400..1afdb613 100644 --- a/test/table.jl +++ b/test/table.jl @@ -25,30 +25,30 @@ iris = dataset("datasets", "iris") ) @test value(fitMeanWeight) ≈ mean(iris[!,:SepalWidth], weights(weightVector)) - meanBy = by(iris, :Species, :SepalWidth => mean) - cn = names(meanBy)[2] - fitMeanBy = fittable(DataFrames.eachrow(iris), Mean, :SepalWidth, by=(:Species,)) - for (key, value) in value(fitMeanBy) - @test value ≈ meanBy[meanBy[:Species] .== key, cn][1] - end + # meanBy = combine(groupby(iris, :Species), SepalWidth => mean) + # cn = names(meanBy)[2] + # fitMeanBy = fittable(DataFrames.eachrow(iris), Mean, :SepalWidth, by=(:Species,)) + # for (key, value) in value(fitMeanBy) + # @test value ≈ meanBy[meanBy[:Species] .== key, cn][1] + # end - meanByWeight = by(iris, :Species, - SepalWidth_mean = [:SepalWidth, :SepalLength] => - (x -> mean(x.SepalWidth, - weights(map(x -> (x - minSepalLength) / - (maxSepalLength - minSepalLength), - x.SepalLength) - ) - ) - ) - ) - fitMeanByWeight = fittable(DataFrames.eachrow(iris), WeightedMean, - :SepalWidth, by=(:Species,), - weight=(x -> (x.SepalLength - minSepalLength) / - (maxSepalLength - minSepalLength) - ) - ) - for (key, value) in value(fitMeanByWeight) - @test value ≈ meanByWeight[meanByWeight[!,:Species] .== key, :SepalWidth_mean][1] - end + # meanByWeight = by(iris, :Species, + # SepalWidth_mean = [:SepalWidth, :SepalLength] => + # (x -> mean(x.SepalWidth, + # weights(map(x -> (x - minSepalLength) / + # (maxSepalLength - minSepalLength), + # x.SepalLength) + # ) + # ) + # ) + # ) + # fitMeanByWeight = fittable(DataFrames.eachrow(iris), WeightedMean, + # :SepalWidth, by=(:Species,), + # weight=(x -> (x.SepalLength - minSepalLength) / + # (maxSepalLength - minSepalLength) + # ) + # ) + # for (key, value) in value(fitMeanByWeight) + # @test value ≈ meanByWeight[meanByWeight[!,:Species] .== key, :SepalWidth_mean][1] + # end end diff --git a/test/transform.jl b/test/transform.jl index 51d324a7..33510eee 100644 --- a/test/transform.jl +++ b/test/transform.jl @@ -12,11 +12,11 @@ d2 = subsetcube(c,variable="gross_primary_productivity",lon=(10,11),lat=(50,51), time=(Date("2002-01-01"),Date("2008-12-31"))) d3 = subsetcube(c,variable="net_ecosystem_exchange",lon=(10,11),lat=(50,51), time=(Date("2002-01-01"),Date("2007-12-31"))) -conccube = concatenateCubes([d1,d2],CategoricalAxis("NewAxis",["v1","v2"])) +conccube = concatenatecubes([d1,d2],CategoricalAxis("NewAxis",["v1","v2"])) @test size(conccube)==(4,4,322,2) @test ESDL.caxes(conccube)[1:3]==ESDL.caxes(d1) @test ESDL.caxes(conccube)[4]==CategoricalAxis("NewAxis",["v1","v2"]) -@test_throws ErrorException concatenateCubes([d1,d3],CategoricalAxis("NewAxis",["v1","v2"])) +@test_throws ErrorException concatenatecubes([d1,d3],CategoricalAxis("NewAxis",["v1","v2"])) dd1 = readcubedata(d1) dd2 = readcubedata(d2) aout = zeros(Float32,4,4,322,2) @@ -26,7 +26,7 @@ ddconc = readcubedata(conccube) @test ddconc.data[:,:,:,2] == dd2.data #@test isa(ESDL.Cubes.gethandle(conccube,(2,2,2,2)),ESDL.CubeAPI.CachedArrays.CachedArray) @test mean(ddconc.data,dims=1)[1,:,:,:]==mapslices(mean∘skipmissing,conccube,dims="Lon").data -conccube2 = concatenateCubes([dd1,dd2],CategoricalAxis("NewAxis",["v1","v2"])) +conccube2 = concatenatecubes([dd1,dd2],CategoricalAxis("NewAxis",["v1","v2"])) #@test isa(ESDL.Cubes.gethandle(conccube2,(2,2,2,2)),Tuple{AbstractArray,AbstractArray}) @test mean(ddconc.data,dims=1)[1,:,:,:]==mapslices(mean∘skipmissing,conccube2,dims="Lon").data end