diff --git a/NEWS.md b/NEWS.md index 0ee2d62a8e..055e5f6cf3 100644 --- a/NEWS.md +++ b/NEWS.md @@ -33,12 +33,13 @@ ## Other relevant changes -* `innerjoin`, `leftjoin`, `rightjoin`, and `outerjoin` are now much faster and - check if passed data frames are sorted by the `on` columns and takes into account - if shorter data frame that is joined has unique values in `on` columns. - These aspects of input data frames might affect the order of rows produced in the output +* `innerjoin`, `leftjoin`, `rightjoin`, `outerjoin`, `semijoin`, and `antijoin` + are now much faster and check if passed data frames are sorted by the `on` + columns and take into account if shorter data frame that is joined has unique + values in `on` columns. These aspects of input data frames might affect the + order of rows produced in the output ([#2612](https://github.com/JuliaData/DataFrames.jl/pull/2612), - [#2622][https://github.com/JuliaData/DataFrames.jl/pull/2622]) + [#2622][https://github.com/JuliaData/DataFrames.jl/pull/2622]) # DataFrames v0.22 Release Notes diff --git a/Project.toml b/Project.toml index ca71a45517..0fe7a5d6c3 100644 --- a/Project.toml +++ b/Project.toml @@ -25,7 +25,7 @@ Unicode = "4ec0a83e-493e-50e2-b9ac-8f72acf5a8f5" [compat] CategoricalArrays = "0.9.3" Compat = "3.17" -DataAPI = "1.4" +DataAPI = "1.6" InvertedIndices = "1" IteratorInterfaceExtensions = "0.1.1, 1" Missings = "0.4.2" diff --git a/benchmarks/join_performance.jl b/benchmarks/join_performance.jl index 88fc7101f3..af3b8a59ff 100644 --- a/benchmarks/join_performance.jl +++ b/benchmarks/join_performance.jl @@ -10,7 +10,7 @@ fullgc() = (GC.gc(true); GC.gc(true); GC.gc(true); GC.gc(true)) @assert ARGS[4] in ["uniq", "dup", "manydup"] @assert ARGS[5] in ["sort", "rand"] @assert ARGS[6] in ["1", "2"] -@assert ARGS[7] in ["inner", "left", "right", "outer"] +@assert ARGS[7] in ["inner", "left", "right", "outer", "semi", "anti"] @info ARGS @@ -76,7 +76,8 @@ else end const joinfun = Dict("inner" => innerjoin, "left" => leftjoin, - "right" => rightjoin, "outer" => outerjoin)[ARGS[7]] + "right" => rightjoin, "outer" => outerjoin, + "semi" => semijoin, "anti" => antijoin)[ARGS[7]] if ARGS[6] == "1" df1 = DataFrame(id1 = col1) diff --git a/benchmarks/run.sh b/benchmarks/run.sh index ceb00e5b80..0cd0c3811c 100644 --- a/benchmarks/run.sh +++ b/benchmarks/run.sh @@ -6,3 +6,7 @@ julia runtests.jl 100000 50000000 right julia runtests.jl 5000000 10000000 right julia runtests.jl 100000 50000000 outer julia runtests.jl 5000000 10000000 outer +julia runtests.jl 100000 50000000 semi +julia runtests.jl 5000000 10000000 semi +julia runtests.jl 100000 50000000 anti +julia runtests.jl 5000000 10000000 anti diff --git a/src/join/composer.jl b/src/join/composer.jl index 40691ee672..e582ce1edf 100644 --- a/src/join/composer.jl +++ b/src/join/composer.jl @@ -339,33 +339,9 @@ function _join(df1::AbstractDataFrame, df2::AbstractDataFrame; joined, src_indicator = compose_joined_table(joiner, kind, makeunique, left_rename, right_rename, indicator) elseif kind == :semi - # hash the right rows - dfr_on_grp = group_rows(joiner.dfr_on) - # iterate over left rows and leave those found in right - left_ixs = Vector{Int}() - sizehint!(left_ixs, nrow(joiner.dfl)) - dfr_on_grp_cols = ntuple(i -> dfr_on_grp.df[!, i], ncol(dfr_on_grp.df)) - dfl_on_cols = ntuple(i -> joiner.dfl_on[!, i], ncol(joiner.dfl_on)) - @inbounds for l_ix in 1:nrow(joiner.dfl_on) - if findrow(dfr_on_grp, joiner.dfl_on, dfr_on_grp_cols, dfl_on_cols, l_ix) != 0 - push!(left_ixs, l_ix) - end - end - joined = joiner.dfl[left_ixs, :] + joined = joiner.dfl[find_semi_rows(joiner), :] elseif kind == :anti - # hash the right rows - dfr_on_grp = group_rows(joiner.dfr_on) - # iterate over left rows and leave those not found in right - leftonly_ixs = Vector{Int}() - sizehint!(leftonly_ixs, nrow(joiner.dfl)) - dfr_on_grp_cols = ntuple(i -> dfr_on_grp.df[!, i], ncol(dfr_on_grp.df)) - dfl_on_cols = ntuple(i -> joiner.dfl_on[!, i], ncol(joiner.dfl_on)) - @inbounds for l_ix in 1:nrow(joiner.dfl_on) - if findrow(dfr_on_grp, joiner.dfl_on, dfr_on_grp_cols, dfl_on_cols, l_ix) == 0 - push!(leftonly_ixs, l_ix) - end - end - joined = joiner.dfl[leftonly_ixs, :] + joined = joiner.dfl[.!find_semi_rows(joiner), :] else throw(ArgumentError("Unknown kind of join requested: $kind")) end diff --git a/src/join/core.jl b/src/join/core.jl index e59d2d9350..ae138eb8b4 100644 --- a/src/join/core.jl +++ b/src/join/core.jl @@ -1,3 +1,5 @@ +### Common preprocessing + struct OnColRow{T} row::Int cols::T @@ -96,7 +98,8 @@ check_mapping_allowed(short::AbstractVector, refarray_long::AbstractVector, !isempty(short) && !isnothing(refpool_long) && !isnothing(invrefpool_long) && eltype(refarray_long) <: Union{Signed, Unsigned} -@noinline map_refarray(mapping::AbstractVector, refarray::AbstractVector, ::Val{fi}) where {fi} = +@noinline map_refarray(mapping::AbstractVector, refarray::AbstractVector, + ::Val{fi}) where {fi} = [@inbounds mapping[r - fi + 1] for r in refarray] function map2refs(x::AbstractVector, invrefpool) @@ -115,6 +118,89 @@ function map2refs(x::AbstractVector, invrefpool) end end +function preprocess_columns(joiner::DataFrameJoiner) + right_len = length(joiner.dfr_on[!, 1]) + left_len = length(joiner.dfl_on[!, 1]) + right_shorter = right_len < left_len + + left_cols = collect(eachcol(joiner.dfl_on)) + right_cols = collect(eachcol(joiner.dfr_on)) + + # if column of the longer table supports DataAPI.refpool and DataAPI.invrefpool + # remap matching left and right columns to use refs + if right_shorter + for i in eachindex(left_cols, right_cols) + rc = right_cols[i] + lc = left_cols[i] + + lc_refs = DataAPI.refarray(lc) + lc_refpool = DataAPI.refpool(lc) + lc_invrefpool = DataAPI.invrefpool(lc) + if check_mapping_allowed(rc, lc_refs, lc_refpool, lc_invrefpool) + right_cols[i] = map2refs(rc, lc_invrefpool) + left_cols[i] = lc_refs + end + end + else + for i in eachindex(left_cols, right_cols) + rc = right_cols[i] + lc = left_cols[i] + + rc_refs = DataAPI.refarray(rc) + rc_refpool = DataAPI.refpool(rc) + rc_invrefpool = DataAPI.invrefpool(rc) + if check_mapping_allowed(lc, rc_refs, rc_refpool, rc_invrefpool) + right_cols[i] = rc_refs + left_cols[i] = map2refs(lc, rc_invrefpool) + end + end + end + + disallow_sorted = false + + for (lc, rc) in zip(left_cols, right_cols) + @assert length(lc) == left_len + @assert length(rc) == right_len + lc_et = nonmissingtype(eltype(lc)) + rc_et = nonmissingtype(eltype(rc)) + + # special case common safe scenarios when eltype between left and right + # column can be different or non-concrete + lc_et <: Real && rc_et <: Real && continue + lc_et <: AbstractString && rc_et <: AbstractString && continue + + # otherwise we require non-missing eltype of both sides to be the same and concrete + lc_et === rc_et && isconcretetype(lc_et) && continue + + # we disallow using sorted branch for some columns that theoretically + # could be safely sorted (e.g. having Any eltype but holding strings) + # for safety reasons assuming that such cases will be rare in practice + disallow_sorted = true + end + + # TODO: + # If DataAPI.invrefpool vectors are found in the "on" columns + # then potentially the following optimizations can be done: + # 1. identify rows in shorter table that should be dropped + # 2. develop custom _innerjoin_sorted and _innerjoin_unsorted that drop rows + # from shorter table that do not match rows from longer table based on + # PooledArray refpool check + # This optimization would significantly complicate the code (especially + # sorted path). It should be added if in practice we find that the use case + # is often enough and that the benefits are significant. The two cases when + # the benefits should be expected are: + # 1. Shorter table is sorted when we drop rows not matching longer table rows + # 2. Shorter table does not have duplicates when we drop rows not matching + # longer table rows + + left_col = prepare_on_col(left_cols...) + right_col = prepare_on_col(right_cols...) + + return left_col, right_col, right_shorter, disallow_sorted +end + +### innerjoin logic + @inline function find_next_range(x::AbstractArray, start::Int, start_value) stop_value = start_value n = length(x) @@ -157,7 +243,8 @@ function _innerjoin_sorted(left::AbstractArray, right::AbstractArray) idx = length(left_ixs) left_range = left_cur:left_new - 1 right_range = right_cur:right_new - 1 - to_grow = Base.checked_add(idx, Base.checked_mul(length(left_range), length(right_range))) + to_grow = Base.checked_add(idx, Base.checked_mul(length(left_range), + length(right_range))) resize!(left_ixs, to_grow) resize!(right_ixs, to_grow) @inbounds for right_i in right_range, left_i in left_range @@ -190,6 +277,14 @@ function _innerjoin_unsorted(left::AbstractArray, right::AbstractArray{T}) where dict = Dict{T, Int}() right_len = length(right) + # we make sure that: + # * we do not preallocate dict of size larger than half of size of Int + # (this is relevant in 32 bit architectures) + # * dict has at least 2x more slots than the number of values we + # might store in it to avoid reallocations of internal structures when + # we populate it later and to minimize the number of hash collisions; + # typically Dict allows for 16 probes; + # the value of multiplier is heuristic was determined by empirical tests sizehint!(dict, 2 * min(right_len, typemax(Int) >> 2)) right isa OnCol && _prehash(right) @@ -241,14 +336,15 @@ function _innerjoin_unsorted_int(left::AbstractVector{<:Union{Integer, Missing}} offset = 1 - Int(minv) # we are now sure it does not overflow len = Int(maxv) - Int(minv) + 2 - dict = zeros(Int, len) + group_map = zeros(Int, len) @inbounds for (idx_r, val_r) in enumerate(right) - i = ismissing(val_r) ? length(dict) : Int(val_r) + offset - if dict[i] > 0 - return _innerjoin_dup_int(left, right, dict, idx_r, offset, Int(minv), Int(maxv)) + i = val_r === missing ? length(group_map) : Int(val_r) + offset + if group_map[i] > 0 + return _innerjoin_dup_int(left, right, group_map, idx_r, offset, + Int(minv), Int(maxv)) end - dict[i] = idx_r + group_map[i] = idx_r end left_ixs = Int[] @@ -259,17 +355,14 @@ function _innerjoin_unsorted_int(left::AbstractVector{<:Union{Integer, Missing}} sizehint!(right_ixs, right_len) @inbounds for (idx_l, val_l) in enumerate(left) - # we use dict_index to make sure the following two operations are fast: - # - if index is found - get it and process it - # - if index is not found - do nothing - if ismissing(val_l) - idx_r = dict[end] + if val_l === missing + idx_r = group_map[end] if idx_r > 0 push!(left_ixs, idx_l) push!(right_ixs, idx_r) end elseif minv <= val_l <= maxv - idx_r = dict[Int(val_l) + offset] + idx_r = group_map[Int(val_l) + offset] if idx_r > 0 push!(left_ixs, idx_l) push!(right_ixs, idx_r) @@ -307,7 +400,7 @@ end function _innerjoin_dup_int(left::AbstractVector{<:Union{Integer, Missing}}, right::AbstractVector{<:Union{Integer, Missing}}, - dict::Vector{Int}, idx_r_start::Int, offset::Int, + group_map::Vector{Int}, idx_r_start::Int, offset::Int, minv::Int, maxv::Int) ngroups = idx_r_start - 1 right_len = length(right) @@ -316,19 +409,20 @@ function _innerjoin_dup_int(left::AbstractVector{<:Union{Integer, Missing}}, @inbounds for idx_r in idx_r_start:right_len val_r = right[idx_r] - i = ismissing(val_r) ? length(dict) : Int(val_r) + offset - dict_val = dict[i] - if dict_val > 0 - groups[idx_r] = dict_val + i = val_r === missing ? length(group_map) : Int(val_r) + offset + group_map_val = group_map[i] + if group_map_val > 0 + groups[idx_r] = group_map_val else ngroups += 1 groups[idx_r] = ngroups - dict[i] = ngroups + group_map[i] = ngroups end end @assert ngroups > 0 # we should not get here with 0-length right - return _innerjoin_postprocess_int(left, dict, groups, ngroups, right_len, offset, minv, maxv) + return _innerjoin_postprocess_int(left, group_map, groups, ngroups, right_len, + offset, minv, maxv) end function compute_join_indices!(groups::Vector{Int}, ngroups::Int, @@ -348,7 +442,8 @@ function compute_join_indices!(groups::Vector{Int}, ngroups::Int, end function _innerjoin_postprocess(left::AbstractArray, dict::Dict{T, Int}, - groups::Vector{Int}, ngroups::Int, right_len::Int) where {T} + groups::Vector{Int}, ngroups::Int, + right_len::Int) where {T} starts = zeros(Int, ngroups) rperm = Vector{Int}(undef, right_len) @@ -384,7 +479,7 @@ function _innerjoin_postprocess(left::AbstractArray, dict::Dict{T, Int}, end function _innerjoin_postprocess_int(left::AbstractVector{<:Union{Integer, Missing}}, - dict::Vector{Int}, + group_map::Vector{Int}, groups::Vector{Int}, ngroups::Int, right_len::Int, offset::Int, minv::Int, maxv::Int) starts = zeros(Int, ngroups) @@ -400,10 +495,10 @@ function _innerjoin_postprocess_int(left::AbstractVector{<:Union{Integer, Missin n = 0 @inbounds for (idx_l, val_l) in enumerate(left) - if ismissing(val_l) - group_id = dict[end] + if val_l === missing + group_id = group_map[end] elseif minv <= val_l <= maxv - group_id = dict[Int(val_l) + offset] + group_id = group_map[Int(val_l) + offset] else group_id = 0 end @@ -428,84 +523,299 @@ function _innerjoin_postprocess_int(left::AbstractVector{<:Union{Integer, Missin end function find_inner_rows(joiner::DataFrameJoiner) - right_len = length(joiner.dfr_on[!, 1]) - left_len = length(joiner.dfl_on[!, 1]) - right_shorter = right_len <= left_len - left_cols = collect(eachcol(joiner.dfl_on)) - right_cols = collect(eachcol(joiner.dfr_on)) + left_col, right_col, right_shorter, disallow_sorted = preprocess_columns(joiner) + + # we treat this case separately so we know we have at least one element later + (isempty(left_col) || isempty(right_col)) && return Int[], Int[] + + # if sorting is not disallowed try using a fast algorithm that works + # on sorted columns; if it is not run or errors fall back to the unsorted case + # the try-catch is used to handle the case when columns on which we join + # contain values that are not comparable + if !disallow_sorted + try + if issorted(left_col) && issorted(right_col) + return _innerjoin_sorted(left_col, right_col) + end + catch + # nothing to do - one of the columns is not sortable + end + end - # if column of a longer table supports DataAPI.refpool and DataAPI.invrefpool - # remap matching left and right columns to use refs if right_shorter - for i in eachindex(left_cols, right_cols) - rc = right_cols[i] - lc = left_cols[i] + if left_col isa AbstractVector{<:Union{Integer, Missing}} && + right_col isa AbstractVector{<:Union{Integer, Missing}} + return _innerjoin_unsorted_int(left_col, right_col) + else + return _innerjoin_unsorted(left_col, right_col) + end + else + if left_col isa AbstractVector{<:Union{Integer, Missing}} && + right_col isa AbstractVector{<:Union{Integer, Missing}} + return reverse(_innerjoin_unsorted_int(right_col, left_col)) + else + return reverse(_innerjoin_unsorted(right_col, left_col)) + end + end - lc_refs = DataAPI.refarray(lc) - lc_refpool = DataAPI.refpool(lc) - lc_invrefpool = DataAPI.invrefpool(lc) - if check_mapping_allowed(rc, lc_refs, lc_refpool, lc_invrefpool) - right_cols[i] = map2refs(rc, lc_invrefpool) - left_cols[i] = lc_refs + error("unreachable reached") +end + +### semijoin logic + +function _semijoin_sorted(left::AbstractArray, right::AbstractArray, + seen_rows::AbstractVector{Bool}) + left_n = length(left) + right_n = length(right) + + @assert left_n > 0 && right_n > 0 + + left_cur = 1 + left_val = left[left_cur] + left_new, left_tmp = find_next_range(left, left_cur, left_val) + + right_cur = 1 + right_val = right[right_cur] + right_new, right_tmp = find_next_range(right, right_cur, right_val) + + while left_cur <= left_n && right_cur <= right_n + if isequal(left_val, right_val) + seen_rows[left_cur:left_new - 1] .= true + left_cur, left_val = left_new, left_tmp + left_new, left_tmp = find_next_range(left, left_cur, left_val) + right_cur, right_val = right_new, right_tmp + right_new, right_tmp = find_next_range(right, right_cur, right_val) + elseif isless(left_val, right_val) + left_cur, left_val = left_new, left_tmp + left_new, left_tmp = find_next_range(left, left_cur, left_val) + else + right_cur, right_val = right_new, right_tmp + right_new, right_tmp = find_next_range(right, right_cur, right_val) + end + end + + return seen_rows +end + +# optimistically assume that shorter table does not have duplicates in on column +# if this is not the case we call _semijoin_dup +# which efficiently uses the work already done and continues with the more +# memory expensive algorithm that allows for duplicates +# note that in semijoin and antijoin we do not have to do it if right table is +# shorter as then we process left table row by row anyway +function _semijoin_unsorted(left::AbstractArray, right::AbstractArray{T}, + seen_rows::AbstractVector{Bool}, + right_shorter::Bool) where {T} + right_len = length(right) + right isa OnCol && _prehash(right) + left isa OnCol && _prehash(left) + + if right_shorter + @assert length(left) == length(seen_rows) + set = Set{T}() + sizehint!(set, 2 * min(right_len, typemax(Int) >> 2)) + for val_r in right + push!(set, val_r) + end + @inbounds for (idx_l, val_l) in enumerate(left) + seen_rows[idx_l] = val_l in set + end + else + @assert length(right) == length(seen_rows) + dict = Dict{T, Int}() + sizehint!(dict, 2 * min(right_len, typemax(Int) >> 2)) + for (idx_r, val_r) in enumerate(right) + haskey(dict, val_r) && return _semijoin_dup(left, right, dict, idx_r, + seen_rows) + dict[val_r] = idx_r + end + @inbounds for (idx_l, val_l) in enumerate(left) + # we know that dict contains only positive values + idx_r = get(dict, val_l, -1) + if idx_r != -1 + seen_rows[idx_r] = true + end + end + end + + return seen_rows +end + +function _semijoin_unsorted_int(left::AbstractVector{<:Union{Integer, Missing}}, + right::AbstractVector{<:Union{Integer, Missing}}, + seen_rows::AbstractVector{Bool}, + right_shorter::Bool) + minv, maxv = extrema_missing(right) + + val_range = big(maxv) - big(minv) + if val_range > typemax(Int) - 3 || val_range ÷ 2 > max(64, length(right)) || + minv < typemin(Int) + 2 || maxv > typemax(Int) - 3 + return _semijoin_unsorted(left, right, seen_rows, right_shorter) + end + + offset = 1 - Int(minv) # we are now sure it does not overflow + len = Int(maxv) - Int(minv) + 2 + group_map = zeros(Int, len) + + if right_shorter + @inbounds for (idx_r, val_r) in enumerate(right) + i = val_r === missing ? length(group_map) : Int(val_r) + offset + group_map[i] = idx_r + end + @inbounds for (idx_l, val_l) in enumerate(left) + if val_l === missing + idx_r = group_map[end] + seen_rows[idx_l] = idx_r > 0 + elseif minv <= val_l <= maxv + idx_r = group_map[Int(val_l) + offset] + seen_rows[idx_l] = idx_r > 0 end end else - for i in eachindex(left_cols, right_cols) - rc = right_cols[i] - lc = left_cols[i] + @inbounds for (idx_r, val_r) in enumerate(right) + i = val_r === missing ? length(group_map) : Int(val_r) + offset + if group_map[i] > 0 + return _semijoin_dup_int(left, right, group_map, idx_r, offset, + Int(minv), Int(maxv), seen_rows) + end + group_map[i] = idx_r + end + @inbounds for (idx_l, val_l) in enumerate(left) + if val_l === missing + idx_r = group_map[end] + if idx_r > 0 + seen_rows[idx_r] = true + end + elseif minv <= val_l <= maxv + idx_r = group_map[Int(val_l) + offset] + if idx_r > 0 + seen_rows[idx_r] = true + end + end + end + end - rc_refs = DataAPI.refarray(rc) - rc_refpool = DataAPI.refpool(rc) - rc_invrefpool = DataAPI.invrefpool(rc) - if check_mapping_allowed(lc, rc_refs, rc_refpool, rc_invrefpool) - right_cols[i] = rc_refs - left_cols[i] = map2refs(lc, rc_invrefpool) + return seen_rows +end + +# we fall back to general case if we have duplicates +# normally it should happen fast as we reuse work already done +function _semijoin_dup(left::AbstractArray, right::AbstractArray{T}, + dict::Dict{T, Int}, idx_r_start::Int, + seen_rows::AbstractVector{Bool}) where {T} + ngroups = idx_r_start - 1 + right_len = length(right) + groups = Vector{Int}(undef, right_len) + groups[1:ngroups] = 1:ngroups + + @inbounds for idx_r in idx_r_start:right_len + val_r = right[idx_r] + # we know that group ids are positive + group_id = get(dict, val_r, -1) + if group_id == -1 + ngroups += 1 + groups[idx_r] = ngroups + dict[val_r] = ngroups + else + groups[idx_r] = group_id + end + end + + @assert ngroups > 0 # we should not get here with 0-length right + @assert length(right) == length(seen_rows) + return _semijoin_postprocess(left, dict, groups, ngroups, right_len, + seen_rows) +end + +function _semijoin_dup_int(left::AbstractVector{<:Union{Integer, Missing}}, + right::AbstractVector{<:Union{Integer, Missing}}, + group_map::Vector{Int}, idx_r_start::Int, offset::Int, + minv::Int, maxv::Int, seen_rows::AbstractVector{Bool}) + ngroups = idx_r_start - 1 + right_len = length(right) + groups = Vector{Int}(undef, right_len) + groups[1:ngroups] = 1:ngroups + + @inbounds for idx_r in idx_r_start:right_len + val_r = right[idx_r] + i = val_r === missing ? length(group_map) : Int(val_r) + offset + group_map_val = group_map[i] + if group_map_val > 0 + groups[idx_r] = group_map_val + else + ngroups += 1 + groups[idx_r] = ngroups + group_map[i] = ngroups + end + end + + @assert ngroups > 0 # we should not get here with 0-length right + @assert length(right) == length(seen_rows) + return _semijoin_postprocess_int(left, group_map, groups, ngroups, right_len, + offset, minv, maxv, seen_rows) +end + +function _semijoin_postprocess(left::AbstractArray, dict::Dict{T, Int}, + groups::Vector{Int}, ngroups::Int, right_len::Int, + seen_rows::AbstractVector{Bool}) where {T} + starts = zeros(Int, ngroups) + rperm = Vector{Int}(undef, right_len) + + compute_join_indices!(groups, ngroups, starts, rperm) + + @inbounds for (idx_l, val_l) in enumerate(left) + group_id = get(dict, val_l, -1) + if group_id != -1 + ref_stop = starts[group_id + 1] + l = ref_stop - starts[group_id] + for i in 1:l + seen_rows[rperm[ref_stop - i + 1]] = true end end end - disallow_sorted = false + return seen_rows +end - for (lc, rc) in zip(left_cols, right_cols) - @assert length(lc) == left_len - @assert length(rc) == right_len - lc_et = nonmissingtype(eltype(lc)) - rc_et = nonmissingtype(eltype(rc)) +function _semijoin_postprocess_int(left::AbstractVector{<:Union{Integer, Missing}}, + group_map::Vector{Int}, + groups::Vector{Int}, ngroups::Int, right_len::Int, + offset::Int, minv::Int, maxv::Int, + seen_rows::AbstractVector{Bool}) + starts = zeros(Int, ngroups) + rperm = Vector{Int}(undef, right_len) - # special case common safe scenarios when eltype between left and right column - # can be different or non-concrete - lc_et <: Real && rc_et <: Real && continue - lc_et <: AbstractString && rc_et <: AbstractString && continue + compute_join_indices!(groups, ngroups, starts, rperm) - # otherwise we require non-missing eltype of both sides to be the same and concrete - lc_et === rc_et && isconcretetype(lc_et) && continue + @inbounds for (idx_l, val_l) in enumerate(left) + if val_l === missing + group_id = group_map[end] + elseif minv <= val_l <= maxv + group_id = group_map[Int(val_l) + offset] + else + group_id = 0 + end - # we disallow using sorted branch for some columns that theoretically - # could be safely sorted (e.g. having Any eltype but holding strings) - # for safety reasons assuming that such cases will be rare in practice - disallow_sorted = true + if group_id > 0 + ref_stop = starts[group_id + 1] + l = ref_stop - starts[group_id] + for i in 1:l + seen_rows[rperm[ref_stop - i + 1]] = true + end + end end - # TODO: - # If DataAPI.invrefpool vectors are found in the "on" columns - # then potentially the following optimizations can be done: - # 1. identify rows in shorter table that should be dropped - # 2. develop custom _innerjoin_sorted and _innerjoin_unsorted that - # drop rows from shorter table that do not match rows from longer table based on - # PooledArray refpool check - # This optimization significantly complicates the code (especially sorted path). - # It should be added if in practice we find that the use case is often enough - # and that the benefits are significant. The two cases when the benefits should - # be expected are: - # 1. Shorter table is sorted when we drop rows not matching longer table rows - # 2. Shorter table does not have duplicates when we drop rows not matching longer table rows + return seen_rows +end - left_col = prepare_on_col(left_cols...) - right_col = prepare_on_col(right_cols...) +function find_semi_rows(joiner::DataFrameJoiner) + left_col, right_col, right_shorter, disallow_sorted = preprocess_columns(joiner) + + seen_rows = falses(length(left_col)) # we treat this case separately so we know we have at least one element later - (isempty(left_col) || isempty(right_col)) && return Int[], Int[] + (isempty(left_col) || isempty(right_col)) && return falses(length(left_col)) # if sorting is not disallowed try using a fast algorithm that works # on sorted columns; if it is not run or errors fall back to the unsorted case @@ -514,7 +824,7 @@ function find_inner_rows(joiner::DataFrameJoiner) if !disallow_sorted try if issorted(left_col) && issorted(right_col) - return _innerjoin_sorted(left_col, right_col) + return _semijoin_sorted(left_col, right_col, seen_rows) end catch # nothing to do - one of the columns is not sortable @@ -524,18 +834,17 @@ function find_inner_rows(joiner::DataFrameJoiner) if right_shorter if left_col isa AbstractVector{<:Union{Integer, Missing}} && right_col isa AbstractVector{<:Union{Integer, Missing}} - return _innerjoin_unsorted_int(left_col, right_col) + return _semijoin_unsorted_int(left_col, right_col, seen_rows, right_shorter) else - return _innerjoin_unsorted(left_col, right_col) + return _semijoin_unsorted(left_col, right_col, seen_rows, right_shorter) end else if left_col isa AbstractVector{<:Union{Integer, Missing}} && right_col isa AbstractVector{<:Union{Integer, Missing}} - return reverse(_innerjoin_unsorted_int(right_col, left_col)) + return _semijoin_unsorted_int(right_col, left_col, seen_rows, right_shorter) else - return reverse(_innerjoin_unsorted(right_col, left_col)) + return _semijoin_unsorted(right_col, left_col, seen_rows, right_shorter) end end - error("unreachable reached") end diff --git a/test/join.jl b/test/join.jl index 7587c7a3d7..9f1f399281 100644 --- a/test/join.jl +++ b/test/join.jl @@ -1014,7 +1014,7 @@ end end end -@testset "innerjoin correctness tests" begin +@testset "join correctness tests" begin @test_throws ArgumentError DataFrames.prepare_on_col() @@ -1025,7 +1025,7 @@ end df_inner = DataFrame(id=[], x=[], y=[]) for i in axes(df1, 1), j in axes(df2, 1) if isequal(df1.id[i], df2.id[j]) - v = df1.id[i] isa CategoricalValue ? get(df1.id[i]) : df1.id[i] + v = df1.id[i] isa CategoricalValue ? unwrap(df1.id[i]) : df1.id[i] push!(df_inner, (id=v, x=df1.x[i], y=df2.y[j])) end end @@ -1033,7 +1033,7 @@ end df_left_part = DataFrame(id=[], x=[], y=[]) for i in axes(df1, 1) if !(df1.id[i] in Set(df2.id)) - v = df1.id[i] isa CategoricalValue ? get(df1.id[i]) : df1.id[i] + v = df1.id[i] isa CategoricalValue ? unwrap(df1.id[i]) : df1.id[i] push!(df_left_part, (id=v, x=df1.x[i], y=missing)) end end @@ -1041,7 +1041,7 @@ end df_right_part = DataFrame(id=[], x=[], y=[]) for i in axes(df2, 1) if !(df2.id[i] in Set(df1.id)) - v = df2.id[i] isa CategoricalValue ? get(df2.id[i]) : df2.id[i] + v = df2.id[i] isa CategoricalValue ? unwrap(df2.id[i]) : df2.id[i] push!(df_right_part, (id=v, x=missing, y=df2.y[i])) end end @@ -1050,6 +1050,9 @@ end df_right = vcat(df_inner, df_right_part) df_outer = vcat(df_inner, df_left_part, df_right_part) + df_semi = df1[[x in Set(df2.id) for x in df1.id], :] + df_anti = df1[[!(x in Set(df2.id)) for x in df1.id], :] + df1x = copy(df1) df1x.id2 = copy(df1x.id) df2x = copy(df2) @@ -1069,18 +1072,26 @@ end df_left2 = copy(df_left) df_right2 = copy(df_right) df_outer2 = copy(df_outer) + df_semi2 = copy(df_semi) + df_anti2 = copy(df_anti) insertcols!(df_inner2, 3, :id2 => df_inner2.id) insertcols!(df_left2, 3, :id2 => df_left2.id) insertcols!(df_right2, 3, :id2 => df_right2.id) insertcols!(df_outer2, 3, :id2 => df_outer2.id) + insertcols!(df_semi2, 3, :id2 => df_semi2.id) + insertcols!(df_anti2, 3, :id2 => df_anti2.id) df_inner3 = copy(df_inner2) df_left3 = copy(df_left2) df_right3 = copy(df_right2) df_outer3 = copy(df_outer2) + df_semi3 = copy(df_semi2) + df_anti3 = copy(df_anti2) insertcols!(df_inner3, 4, :id3 => df_inner3.id) insertcols!(df_left3, 4, :id3 => df_left3.id) insertcols!(df_right3, 4, :id3 => df_right3.id) insertcols!(df_outer3, 4, :id3 => df_outer3.id) + insertcols!(df_semi3, 4, :id3 => df_semi3.id) + insertcols!(df_anti3, 4, :id3 => df_anti3.id) return df_inner ≅ sort(innerjoin(df1, df2, on=:id, matchmissing=:equal), [:x, :y]) && df_inner2 ≅ sort(innerjoin(df1x, df2x, on=[:id, :id2], matchmissing=:equal), [:x, :y]) && @@ -1093,7 +1104,13 @@ end df_right3 ≅ sort(rightjoin(df1x2, df2x2, on=[:id, :id2, :id3], matchmissing=:equal), [:x, :y]) && df_outer ≅ sort(outerjoin(df1, df2, on=:id, matchmissing=:equal), [:x, :y]) && df_outer2 ≅ sort(outerjoin(df1x, df2x, on=[:id, :id2], matchmissing=:equal), [:x, :y]) && - df_outer3 ≅ sort(outerjoin(df1x2, df2x2, on=[:id, :id2, :id3], matchmissing=:equal), [:x, :y]) + df_outer3 ≅ sort(outerjoin(df1x2, df2x2, on=[:id, :id2, :id3], matchmissing=:equal), [:x, :y]) && + df_semi ≅ semijoin(df1, df2, on=:id, matchmissing=:equal) && + df_semi2 ≅ semijoin(df1x, df2x, on=[:id, :id2], matchmissing=:equal) && + df_semi3 ≅ semijoin(df1x2, df2x2, on=[:id, :id2, :id3], matchmissing=:equal) && + df_anti ≅ antijoin(df1, df2, on=:id, matchmissing=:equal) && + df_anti2 ≅ antijoin(df1x, df2x, on=[:id, :id2], matchmissing=:equal) && + df_anti3 ≅ antijoin(df1x2, df2x2, on=[:id, :id2, :id3], matchmissing=:equal) end Random.seed!(1234) @@ -1175,6 +1192,10 @@ end DataFrame(id=[])) @test isequal_coltyped(outerjoin(DataFrame(id=[]), DataFrame(id=[]), on=:id), DataFrame(id=[])) + @test isequal_coltyped(semijoin(DataFrame(id=[]), DataFrame(id=[]), on=:id), + DataFrame(id=[])) + @test isequal_coltyped(antijoin(DataFrame(id=[]), DataFrame(id=[]), on=:id), + DataFrame(id=[])) @test isequal_coltyped(innerjoin(DataFrame(id=[]), DataFrame(id=[1, 2, 3]), on=:id), DataFrame(id=[])) @@ -1184,6 +1205,10 @@ end DataFrame(id=[1, 2, 3])) @test isequal_coltyped(outerjoin(DataFrame(id=[]), DataFrame(id=[1, 2, 3]), on=:id), DataFrame(id=Any[1, 2, 3])) + @test isequal_coltyped(semijoin(DataFrame(id=[]), DataFrame(id=[1, 2, 3]), on=:id), + DataFrame(id=[])) + @test isequal_coltyped(antijoin(DataFrame(id=[]), DataFrame(id=[1, 2, 3]), on=:id), + DataFrame(id=[])) @test isequal_coltyped(innerjoin(DataFrame(id=[1, 2, 3]), DataFrame(id=[]), on=:id), DataFrame(id=Int[])) @@ -1193,6 +1218,10 @@ end DataFrame(id=Any[])) @test isequal_coltyped(outerjoin(DataFrame(id=[1, 2, 3]), DataFrame(id=[]), on=:id), DataFrame(id=Any[1, 2, 3])) + @test isequal_coltyped(semijoin(DataFrame(id=[1, 2, 3]), DataFrame(id=[]), on=:id), + DataFrame(id=Int[])) + @test isequal_coltyped(antijoin(DataFrame(id=[1, 2, 3]), DataFrame(id=[]), on=:id), + DataFrame(id=[1, 2, 3])) @test isequal_coltyped(innerjoin(DataFrame(id=[4, 5, 6]), DataFrame(id=[1, 2, 3]), on=:id), DataFrame(id=Int[])) @@ -1202,6 +1231,10 @@ end DataFrame(id=Int[1, 2, 3])) @test isequal_coltyped(outerjoin(DataFrame(id=[4, 5, 6]), DataFrame(id=[1, 2, 3]), on=:id), DataFrame(id=Int[4, 5, 6, 1, 2, 3])) + @test isequal_coltyped(semijoin(DataFrame(id=[4, 5, 6]), DataFrame(id=[1, 2, 3]), on=:id), + DataFrame(id=Int[])) + @test isequal_coltyped(antijoin(DataFrame(id=[4, 5, 6]), DataFrame(id=[1, 2, 3]), on=:id), + DataFrame(id=[4, 5, 6])) @test isequal_coltyped(innerjoin(DataFrame(id=[1, 2, 3]), DataFrame(id=[4, 5, 6]), on=:id), DataFrame(id=Int[])) @@ -1211,6 +1244,10 @@ end DataFrame(id=Int[4, 5, 6])) @test isequal_coltyped(outerjoin(DataFrame(id=[1, 2, 3]), DataFrame(id=[4, 5, 6]), on=:id), DataFrame(id=Int[1, 2, 3, 4, 5, 6])) + @test isequal_coltyped(semijoin(DataFrame(id=[1, 2, 3]), DataFrame(id=[4, 5, 6]), on=:id), + DataFrame(id=Int[])) + @test isequal_coltyped(antijoin(DataFrame(id=[1, 2, 3]), DataFrame(id=[4, 5, 6]), on=:id), + DataFrame(id=[1, 2, 3])) @test isequal_coltyped(innerjoin(DataFrame(id=[missing]), DataFrame(id=[1]), on=:id, matchmissing=:equal), DataFrame(id=Missing[])) @@ -1220,6 +1257,10 @@ end DataFrame(id=[1])) @test isequal_coltyped(outerjoin(DataFrame(id=[missing]), DataFrame(id=[1]), on=:id, matchmissing=:equal), DataFrame(id=[missing, 1])) + @test isequal_coltyped(semijoin(DataFrame(id=[missing]), DataFrame(id=[1]), on=:id, matchmissing=:equal), + DataFrame(id=Missing[])) + @test isequal_coltyped(antijoin(DataFrame(id=[missing]), DataFrame(id=[1]), on=:id, matchmissing=:equal), + DataFrame(id=[missing])) @test isequal_coltyped(innerjoin(DataFrame(id=Missing[]), DataFrame(id=[1]), on=:id, matchmissing=:equal), DataFrame(id=Missing[])) @@ -1229,6 +1270,10 @@ end DataFrame(id=[1])) @test isequal_coltyped(outerjoin(DataFrame(id=Missing[]), DataFrame(id=[1]), on=:id, matchmissing=:equal), DataFrame(id=Union{Int, Missing}[1])) + @test isequal_coltyped(semijoin(DataFrame(id=Missing[]), DataFrame(id=[1]), on=:id, matchmissing=:equal), + DataFrame(id=Missing[])) + @test isequal_coltyped(antijoin(DataFrame(id=Missing[]), DataFrame(id=[1]), on=:id, matchmissing=:equal), + DataFrame(id=Missing[])) @test isequal_coltyped(innerjoin(DataFrame(id=Union{Int, Missing}[]), DataFrame(id=[1]), on=:id, matchmissing=:equal), DataFrame(id=Union{Int, Missing}[])) @@ -1238,6 +1283,10 @@ end DataFrame(id=[1])) @test isequal_coltyped(outerjoin(DataFrame(id=Union{Int, Missing}[]), DataFrame(id=[1]), on=:id, matchmissing=:equal), DataFrame(id=Union{Int, Missing}[1])) + @test isequal_coltyped(semijoin(DataFrame(id=Union{Int, Missing}[]), DataFrame(id=[1]), on=:id, matchmissing=:equal), + DataFrame(id=Union{Int, Missing}[])) + @test isequal_coltyped(antijoin(DataFrame(id=Union{Int, Missing}[]), DataFrame(id=[1]), on=:id, matchmissing=:equal), + DataFrame(id=Union{Int, Missing}[])) @test isequal_coltyped(innerjoin(DataFrame(id=Union{Int, Missing}[]), DataFrame(id=[2, 1, 2]), on=:id, matchmissing=:equal), DataFrame(id=Union{Int, Missing}[])) @@ -1247,6 +1296,10 @@ end DataFrame(id=[2, 1, 2])) @test isequal_coltyped(outerjoin(DataFrame(id=Union{Int, Missing}[]), DataFrame(id=[2, 1, 2]), on=:id, matchmissing=:equal), DataFrame(id=Union{Int, Missing}[2, 1, 2])) + @test isequal_coltyped(semijoin(DataFrame(id=Union{Int, Missing}[]), DataFrame(id=[2, 1, 2]), on=:id, matchmissing=:equal), + DataFrame(id=Union{Int, Missing}[])) + @test isequal_coltyped(antijoin(DataFrame(id=Union{Int, Missing}[]), DataFrame(id=[2, 1, 2]), on=:id, matchmissing=:equal), + DataFrame(id=Union{Int, Missing}[])) @test isequal_coltyped(innerjoin(DataFrame(id=Union{Int, Missing}[missing]), DataFrame(id=[1]), on=:id, matchmissing=:equal), @@ -1260,6 +1313,12 @@ end @test isequal_coltyped(outerjoin(DataFrame(id=Union{Int, Missing}[missing]), DataFrame(id=[1]), on=:id, matchmissing=:equal), DataFrame(id=[missing, 1])) + @test isequal_coltyped(semijoin(DataFrame(id=Union{Int, Missing}[missing]), DataFrame(id=[1]), + on=:id, matchmissing=:equal), + DataFrame(id=Union{Int, Missing}[])) + @test isequal_coltyped(antijoin(DataFrame(id=Union{Int, Missing}[missing]), DataFrame(id=[1]), + on=:id, matchmissing=:equal), + DataFrame(id=Union{Int, Missing}[missing])) @test isequal_coltyped(innerjoin(DataFrame(id=[missing]), DataFrame(id=[1, missing]), on=:id, matchmissing=:equal), @@ -1273,6 +1332,12 @@ end @test isequal_coltyped(outerjoin(DataFrame(id=[missing]), DataFrame(id=[1, missing]), on=:id, matchmissing=:equal), DataFrame(id=[missing, 1])) + @test isequal_coltyped(semijoin(DataFrame(id=[missing]), DataFrame(id=[1, missing]), + on=:id, matchmissing=:equal), + DataFrame(id=[missing])) + @test isequal_coltyped(antijoin(DataFrame(id=[missing]), DataFrame(id=[1, missing]), + on=:id, matchmissing=:equal), + DataFrame(id=Missing[])) @test isequal_coltyped(innerjoin(DataFrame(id=Union{Int, Missing}[missing]), DataFrame(id=[1, missing]), on=:id, matchmissing=:equal), @@ -1286,6 +1351,12 @@ end @test isequal_coltyped(outerjoin(DataFrame(id=Union{Int, Missing}[missing]), DataFrame(id=[1, missing]), on=:id, matchmissing=:equal), DataFrame(id=[missing, 1])) + @test isequal_coltyped(semijoin(DataFrame(id=Union{Int, Missing}[missing]), DataFrame(id=[1, missing]), + on=:id, matchmissing=:equal), + DataFrame(id=Union{Int, Missing}[missing])) + @test isequal_coltyped(antijoin(DataFrame(id=Union{Int, Missing}[missing]), DataFrame(id=[1, missing]), + on=:id, matchmissing=:equal), + DataFrame(id=Union{Int, Missing}[])) @test isequal_coltyped(innerjoin(DataFrame(id=[typemin(Int) + 1, typemin(Int)]), DataFrame(id=[typemin(Int)]), on=:id), DataFrame(id=[typemin(Int)])) @@ -1295,6 +1366,10 @@ end DataFrame(id=[typemin(Int)])) @test isequal_coltyped(outerjoin(DataFrame(id=[typemin(Int) + 1, typemin(Int)]), DataFrame(id=[typemin(Int)]), on=:id), DataFrame(id=[typemin(Int), typemin(Int) + 1])) + @test isequal_coltyped(semijoin(DataFrame(id=[typemin(Int) + 1, typemin(Int)]), DataFrame(id=[typemin(Int)]), on=:id), + DataFrame(id=[typemin(Int)])) + @test isequal_coltyped(antijoin(DataFrame(id=[typemin(Int) + 1, typemin(Int)]), DataFrame(id=[typemin(Int)]), on=:id), + DataFrame(id=[typemin(Int) + 1])) @test isequal_coltyped(innerjoin(DataFrame(id=[typemax(Int), typemax(Int) - 1]), DataFrame(id=[typemax(Int)]), on=:id), DataFrame(id=[typemax(Int)])) @@ -1304,6 +1379,10 @@ end DataFrame(id=[typemax(Int)])) @test isequal_coltyped(outerjoin(DataFrame(id=[typemax(Int), typemax(Int) - 1]), DataFrame(id=[typemax(Int)]), on=:id), DataFrame(id=[typemax(Int), typemax(Int) - 1])) + @test isequal_coltyped(semijoin(DataFrame(id=[typemax(Int), typemax(Int) - 1]), DataFrame(id=[typemax(Int)]), on=:id), + DataFrame(id=[typemax(Int)])) + @test isequal_coltyped(antijoin(DataFrame(id=[typemax(Int), typemax(Int) - 1]), DataFrame(id=[typemax(Int)]), on=:id), + DataFrame(id=[typemax(Int) - 1])) @test isequal_coltyped(innerjoin(DataFrame(id=[2000, 2, 100]), DataFrame(id=[2000, 1, 100]), on=:id), DataFrame(id=[2000, 100])) @@ -1313,6 +1392,10 @@ end DataFrame(id=[2000, 100, 1])) @test isequal_coltyped(outerjoin(DataFrame(id=[2000, 2, 100]), DataFrame(id=[2000, 1, 100]), on=:id), DataFrame(id=[2000, 100, 2, 1])) + @test isequal_coltyped(semijoin(DataFrame(id=[2000, 2, 100]), DataFrame(id=[2000, 1, 100]), on=:id), + DataFrame(id=[2000, 100])) + @test isequal_coltyped(antijoin(DataFrame(id=[2000, 2, 100]), DataFrame(id=[2000, 1, 100]), on=:id), + DataFrame(id=[2])) @test isequal_coltyped(outerjoin(DataFrame(id=[1]), DataFrame(id=[4.5]), on=:id), DataFrame(id=[1, 4.5])) @@ -1392,15 +1475,15 @@ end df2[1, :a] = missing m1 = innerjoin(df1, df2, on = [:a, :b], matchmissing=:equal) - @test m1 == DataFrame(a=[:x, :x, :x, :x, :x, :y, :x, :x], - b=[:A, :A, :A, :A, :B, :B, :A, :A], - v1=[1, 1, 2, 2, 3, 4, 5, 5], - v2=[3, 6, 3, 6, 4, 2, 3, 6]) + @test sort(m1) == sort(DataFrame(a=[:x, :x, :x, :x, :x, :y, :x, :x], + b=[:A, :A, :A, :A, :B, :B, :A, :A], + v1=[1, 1, 2, 2, 3, 4, 5, 5], + v2=[3, 6, 3, 6, 4, 2, 3, 6])) m2 = outerjoin(df1, df2, on = [:a, :b], matchmissing=:equal) - @test m2 ≅ DataFrame(a=[:x, :x, :x, :x, :x, :y, :x, :x, :x, missing, :x], - b=[:A, :A, :A, :A, :B, :B, :A, :A, :D, :A, :C], - v1=[1, 1, 2, 2, 3, 4, 5, 5, 6, missing, missing], - v2=[3, 6, 3, 6, 4, 2, 3, 6, missing, 1, 5]) + @test sort(m2) ≅ sort(DataFrame(a=[:x, :x, :x, :x, :x, :y, :x, :x, :x, missing, :x], + b=[:A, :A, :A, :A, :B, :B, :A, :A, :D, :A, :C], + v1=[1, 1, 2, 2, 3, 4, 5, 5, 6, missing, missing], + v2=[3, 6, 3, 6, 4, 2, 3, 6, missing, 1, 5])) Random.seed!(1) df1 = DataFrame(a = ["abc", "abx", "axz", "def", "dfr"], v1 = randn(5))