Skip to content

Commit

Permalink
use multithreading in basic operations (#2647)
Browse files Browse the repository at this point in the history
  • Loading branch information
bkamins authored Mar 19, 2021
1 parent 639e3b1 commit c0c8cd3
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 33 deletions.
7 changes: 6 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@
values in `on` columns. These aspects of input data frames might affect the
order of rows produced in the output
([#2612](https://github.com/JuliaData/DataFrames.jl/pull/2612),
[#2622][https://github.com/JuliaData/DataFrames.jl/pull/2622])
([#2622][https://github.com/JuliaData/DataFrames.jl/pull/2622])
* `DataFrame` constructor, `copy`, `getindex`, `select`, `select!`, `transform`,
`transform!`, and `combine` functions now use multiple threads in selected operations
([#2647](https://github.com/JuliaData/DataFrames.jl/pull/2647)),
([#2588](https://github.com/JuliaData/DataFrames.jl/pull/2588)),
([#2574](https://github.com/JuliaData/DataFrames.jl/pull/2574))

# DataFrames v0.22 Release Notes

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using BenchmarkTools
using DataFrames
using PooledArrays
using Random

@show Threads.nthreads()

Random.seed!(1234)
ref_dfi = DataFrame(rand(1:10^4, 10^7, 4), :auto)
ref_dfs = string.(ref_dfi)
ref_dfp = mapcols(PooledArray, ref_dfs)

res = DataFrame(rows=Int[],cols=Int[], type=String[], op=String[], time=Float64[])

for x in (10, 10^6-1, 10^6, 10^7), y in 1:4
dfi = ref_dfi[1:x, 1:y]
dfs = ref_dfs[1:x, 1:y]
dfp = ref_dfp[1:x, 1:y]

@show (x, y) # ping that the process is alive
push!(res, (x, y, "integer", "copy", @belapsed DataFrame($dfi)))
push!(res, (x, y, "string", "copy", @belapsed DataFrame($dfs)))
push!(res, (x, y, "pooled", "copy", @belapsed DataFrame($dfp)))
push!(res, (x, y, "integer", ":", @belapsed $dfi[:, :]))
push!(res, (x, y, "string", ":", @belapsed $dfs[:, :]))
push!(res, (x, y, "pooled", ":", @belapsed $dfp[:, :]))
push!(res, (x, y, "integer", "1:end-5", @belapsed $dfi[1:end-5, :]))
push!(res, (x, y, "string", "1:end-5", @belapsed $dfs[1:end-5, :]))
push!(res, (x, y, "pooled", "1:end-5", @belapsed $dfp[1:end-5, :]))
push!(res, (x, y, "integer", "1:5", @belapsed $dfi[1:5, :]))
push!(res, (x, y, "string", "1:5", @belapsed $dfs[1:1:5, :]))
push!(res, (x, y, "pooled", "1:5", @belapsed $dfp[1:1:5, :]))
end

res.time *= 1_000

@show Threads.nthreads()
@show unstack(res, [:cols, :type, :op], :rows, :time)
3 changes: 3 additions & 0 deletions benchmarks/constructor_and_indexing/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
julia -t 1 constructor_and_indexing_performance.jl
julia -t 2 constructor_and_indexing_performance.jl
julia -t 4 constructor_and_indexing_performance.jl
101 changes: 101 additions & 0 deletions benchmarks/joins/join_performance.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
using CategoricalArrays
using DataFrames
using PooledArrays
using Random

fullgc() = (GC.gc(true); GC.gc(true); GC.gc(true); GC.gc(true))

@assert length(ARGS) == 7
@assert ARGS[3] in ["int", "pool", "cat", "str"]
@assert ARGS[4] in ["uniq", "dup", "manydup"]
@assert ARGS[5] in ["sort", "rand"]
@assert ARGS[6] in ["1", "2"]
@assert ARGS[7] in ["inner", "left", "right", "outer", "semi", "anti"]

@info ARGS

llen = parse(Int, ARGS[1])
rlen = parse(Int, ARGS[2])
@assert llen > 1000
@assert rlen > 2000

pad = maximum(length.(string.((llen, rlen))))

if ARGS[3] == "int"
if ARGS[4] == "uniq"
col1 = [1:llen;]
col2 = [1:rlen;]
elseif ARGS[4] == "dup"
col1 = repeat(1:llen ÷ 2, inner=2)
col2 = repeat(1:rlen ÷ 2, inner=2)
else
@assert ARGS[4] == "manydup"
col1 = repeat(1:llen ÷ 20, inner=20)
col2 = repeat(1:rlen ÷ 20, inner=20)
end
elseif ARGS[3] == "pool"
if ARGS[4] == "dup"
col1 = PooledArray(repeat(string.(1:llen ÷ 2, pad=pad), inner=2))
col2 = PooledArray(repeat(string.(1:rlen ÷ 2, pad=pad), inner=2))
else
@assert ARGS[4] == "manydup"
col1 = PooledArray(repeat(string.(1:llen ÷ 20, pad=pad), inner=20))
col2 = PooledArray(repeat(string.(1:rlen ÷ 20, pad=pad), inner=20))
end
elseif ARGS[3] == "cat"
if ARGS[4] == "dup"
col1 = categorical(repeat(string.(1:llen ÷ 2, pad=pad), inner=2))
col2 = categorical(repeat(string.(1:rlen ÷ 2, pad=pad), inner=2))
else
@assert ARGS[4] == "manydup"
col1 = categorical(repeat(string.(1:llen ÷ 20, pad=pad), inner=20))
col2 = categorical(repeat(string.(1:rlen ÷ 20, pad=pad), inner=20))
end
else
@assert ARGS[3] == "str"
if ARGS[4] == "uniq"
col1 = string.(1:llen, pad=pad)
col2 = string.(1:rlen, pad=pad)
elseif ARGS[4] == "dup"
col1 = repeat(string.(1:llen ÷ 2, pad=pad), inner=2)
col2 = repeat(string.(1:rlen ÷ 2, pad=pad), inner=2)
else
@assert ARGS[4] == "manydup"
col1 = repeat(string.(1:llen ÷ 20, pad=pad), inner=20)
col2 = repeat(string.(1:rlen ÷ 20, pad=pad), inner=20)
end
end

Random.seed!(1234)

if ARGS[5] == "rand"
shuffle!(col1)
shuffle!(col2)
else
@assert ARGS[5] == "sort"
end

const joinfun = Dict("inner" => innerjoin, "left" => leftjoin,
"right" => rightjoin, "outer" => outerjoin,
"semi" => semijoin, "anti" => antijoin)[ARGS[7]]

if ARGS[6] == "1"
df1 = DataFrame(id1 = col1)
df2 = DataFrame(id1 = col2)
joinfun(df1[1:1000, :], df2[1:2000, :], on=:id1)
joinfun(df2[1:2000, :], df1[1:1000, :], on=:id1)
fullgc()
@time joinfun(df1, df2, on=:id1)
fullgc()
@time joinfun(df2, df1, on=:id1)
else
@assert ARGS[6] == "2"
df1 = DataFrame(id1 = col1, id2 = col1)
df2 = DataFrame(id1 = col2, id2 = col2)
joinfun(df1[1:1000, :], df2[1:2000, :], on=[:id1, :id2])
joinfun(df2[1:2000, :], df1[1:1000, :], on=[:id1, :id2])
fullgc()
@time joinfun(df1, df2, on=[:id1, :id2])
fullgc()
@time joinfun(df2, df1, on=[:id1, :id2])
end
12 changes: 12 additions & 0 deletions benchmarks/joins/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
julia runtests.jl 100000 50000000 inner
julia runtests.jl 5000000 10000000 inner
julia runtests.jl 100000 50000000 left
julia runtests.jl 5000000 10000000 left
julia runtests.jl 100000 50000000 right
julia runtests.jl 5000000 10000000 right
julia runtests.jl 100000 50000000 outer
julia runtests.jl 5000000 10000000 outer
julia runtests.jl 100000 50000000 semi
julia runtests.jl 5000000 10000000 semi
julia runtests.jl 100000 50000000 anti
julia runtests.jl 5000000 10000000 anti
File renamed without changes.
115 changes: 83 additions & 32 deletions src/dataframe/dataframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,19 @@ struct DataFrame <: AbstractDataFrame

# we write into columns as we know that it is guaranteed
# that it was freshly allocated in the outer constructor
for (i, col) in enumerate(columns)
# check for vectors first as they are most common
if col isa AbstractRange
columns[i] = collect(col)
elseif col isa AbstractVector
columns[i] = copycols ? copy(col) : col
elseif col isa Union{AbstractArray{<:Any, 0}, Ref}
x = col[]
columns[i] = fill!(Tables.allocatecolumn(typeof(x), len), x)
@static if VERSION >= v"1.4"
if copycols && len >= 1_000_000 && length(columns) > 1 && Threads.nthreads() > 1
@sync for i in eachindex(columns)
Threads.@spawn columns[i] = _preprocess_column(columns[i], len, copycols)
end
else
if col isa AbstractArray
throw(ArgumentError("adding AbstractArray other than AbstractVector " *
"as a column of a data frame is not allowed"))
for i in eachindex(columns)
columns[i] = _preprocess_column(columns[i], len, copycols)
end
columns[i] = fill!(Tables.allocatecolumn(typeof(col), len), col)
end
else
for i in eachindex(columns)
columns[i] = _preprocess_column(columns[i], len, copycols)
end
end

Expand All @@ -216,6 +214,22 @@ struct DataFrame <: AbstractDataFrame
end
end

function _preprocess_column(col::Any, len::Integer, copycols::Bool)
if col isa AbstractRange
return collect(col)
elseif col isa AbstractVector
return copycols ? copy(col) : col
elseif col isa Union{AbstractArray{<:Any, 0}, Ref}
x = col[]
return fill!(Tables.allocatecolumn(typeof(x), len), x)
elseif col isa AbstractArray
throw(ArgumentError("adding AbstractArray other than AbstractVector " *
"as a column of a data frame is not allowed"))
else
return fill!(Tables.allocatecolumn(typeof(col), len), col)
end
end

DataFrame(df::DataFrame; copycols::Bool=true) = copy(df, copycols=copycols)

function DataFrame(pairs::Pair{Symbol, <:Any}...; makeunique::Bool=false,
Expand Down Expand Up @@ -502,34 +516,75 @@ end
throw(BoundsError(df, (row_inds, col_inds)))
end
selected_columns = index(df)[col_inds]
# Computing integer indices once for all columns is faster
selected_rows = T === Bool ? findall(row_inds) : row_inds
new_columns = AbstractVector[dv[selected_rows] for dv in _columns(df)[selected_columns]]
return DataFrame(new_columns, Index(_names(df)[selected_columns]), copycols=false)

u = _names(df)[selected_columns]
lookup = Dict{Symbol, Int}(zip(u, 1:length(u)))
# use this constructor to avoid checking twice if column names are not
# duplicate as index(df)[col_inds] already checks this
idx = Index(lookup, u)

if length(selected_columns) == 1
return DataFrame(AbstractVector[_columns(df)[selected_columns[1]][row_inds]],
idx, copycols=false)
else
# Computing integer indices once for all columns is faster
selected_rows = T === Bool ? findall(row_inds) : row_inds
@static if VERSION >= v"1.4"
if length(selected_rows) >= 1_000_000 && Threads.nthreads() > 1
new_columns = Vector{AbstractVector}(undef, length(selected_columns))
@sync for i in eachindex(new_columns)
Threads.@spawn new_columns[i] = _columns(df)[selected_columns[i]][selected_rows]
end
return DataFrame(new_columns, idx, copycols=false)
else
return DataFrame(AbstractVector[_columns(df)[i][selected_rows] for i in selected_columns],
idx, copycols=false)
end
else
return DataFrame(AbstractVector[_columns(df)[i][selected_rows] for i in selected_columns],
idx, copycols=false)
end
end
end

@inline function Base.getindex(df::DataFrame, row_inds::AbstractVector{T}, ::Colon) where T
@boundscheck if !checkindex(Bool, axes(df, 1), row_inds)
throw(BoundsError(df, (row_inds, :)))
end
# Computing integer indices once for all columns is faster
selected_rows = T === Bool ? findall(row_inds) : row_inds
new_columns = AbstractVector[dv[selected_rows] for dv in _columns(df)]
return DataFrame(new_columns, copy(index(df)), copycols=false)
idx = copy(index(df))

if ncol(df) == 1
return DataFrame(AbstractVector[_columns(df)[1][row_inds]], idx, copycols=false)
else
# Computing integer indices once for all columns is faster
selected_rows = T === Bool ? findall(row_inds) : row_inds
@static if VERSION >= v"1.4"
if length(selected_rows) >= 1_000_000 && Threads.nthreads() > 1
new_columns = Vector{AbstractVector}(undef, ncol(df))
@sync for i in eachindex(new_columns)
Threads.@spawn new_columns[i] = _columns(df)[i][selected_rows]
end
return DataFrame(new_columns, idx, copycols=false)
else
return DataFrame(AbstractVector[dv[selected_rows] for dv in _columns(df)],
idx, copycols=false)
end
else
return DataFrame(AbstractVector[dv[selected_rows] for dv in _columns(df)],
idx, copycols=false)
end
end
end

@inline Base.getindex(df::DataFrame, row_inds::Not,
col_inds::MultiColumnIndex) =
@inline Base.getindex(df::DataFrame, row_inds::Not, col_inds::MultiColumnIndex) =
df[axes(df, 1)[row_inds], col_inds]

# df[:, MultiColumnIndex] => DataFrame
Base.getindex(df::DataFrame, row_ind::Colon,
col_inds::MultiColumnIndex) =
Base.getindex(df::DataFrame, row_ind::Colon, col_inds::MultiColumnIndex) =
select(df, col_inds, copycols=true)

# df[!, MultiColumnIndex] => DataFrame
Base.getindex(df::DataFrame, row_ind::typeof(!),
col_inds::MultiColumnIndex) =
Base.getindex(df::DataFrame, row_ind::typeof(!), col_inds::MultiColumnIndex) =
select(df, col_inds, copycols=false)

##############################################################################
Expand Down Expand Up @@ -875,11 +930,7 @@ copies of column vectors in `df`.
If `copycols=false`, return a new `DataFrame` sharing column vectors with `df`.
"""
function Base.copy(df::DataFrame; copycols::Bool=true)
if copycols
df[:, :]
else
DataFrame(_columns(df), _names(df), copycols=false)
end
return DataFrame(copy(_columns(df)), copy(index(df)), copycols=copycols)
end

"""
Expand Down
7 changes: 7 additions & 0 deletions test/constructors.jl
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,11 @@ end
@test_throws ArgumentError DataFrame([Int, Float64], ["a", "b"], 2)
end

@testset "threading correctness tests" begin
for x in (10, 2*10^6), y in 1:4
df = DataFrame(rand(x, y), :auto)
@test df == copy(df)
end
end

end # module
11 changes: 11 additions & 0 deletions test/indexing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2009,4 +2009,15 @@ if VERSION >= v"1.5"
include("indexing_offset.jl")
end

@testset "threading correctness tests" begin
for x in (10, 2*10^6), y in 1:4
mat = rand(x, y)
df = DataFrame(mat, :auto)
for rowrange in [:, 1:nrow(df)-5, collect(1:nrow(df)-5), axes(df, 1) .< nrow(df)-5],
colrange in [:, axes(df, 2), collect(axes(df, 2)), 1:ncol(df) - 1]
@test DataFrame(mat[rowrange, colrange], :auto) == df[rowrange, colrange]
end
end
end

end # module

0 comments on commit c0c8cd3

Please sign in to comment.