Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A little refactoring to improve inference and precompilation #880

Merged
merged 5 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/CSV.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const DEFAULT_MAX_INLINE_STRING_LENGTH = 32
const TRUE_STRINGS = ["true", "True", "TRUE", "T", "1"]
const FALSE_STRINGS = ["false", "False", "FALSE", "F", "0"]
const ValidSources = Union{Vector{UInt8}, SubArray{UInt8, 1, Vector{UInt8}}, IO, Cmd, AbstractString, AbstractPath}
const MAX_INPUT_SIZE = Int64(2)^42
const EMPTY_INT_ARRAY = Int64[]

include("keyworddocs.jl")
include("utils.jl")
Expand Down Expand Up @@ -67,4 +69,10 @@ end
include("precompile.jl")
_precompile_()

function __init__()
# CSV.File(IOBuffer(PRECOMPILE_DATA))
# foreach(row -> row, CSV.Rows(IOBuffer(PRECOMPILE_DATA)))
# CSV.File(joinpath(dirname(pathof(CSV)), "..", "test", "testfiles", "promotions.csv"))
end

end # module
172 changes: 105 additions & 67 deletions src/context.jl
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ function checkinvalidcolumns(dict, argname, ncols, names)
return
end

@noinline nonconcretetypes(types) = throw(ArgumentError("Non-concrete types passed in `types` keyword argument, please provide concrete types for columns: $types"))

struct Context
transpose::Val
transpose::Bool
name::String
names::Vector{Symbol}
rowsguess::Int64
Expand Down Expand Up @@ -139,7 +141,7 @@ struct Context
streaming::Bool
end

@refargs function Context(source,
@refargs function Context(source::ValidSources,
# file options
# header can be a row number, range of rows, or actual string vector
header::Union{Integer, Vector{Symbol}, Vector{String}, AbstractVector{<:Integer}},
Expand All @@ -150,39 +152,39 @@ end
transpose::Bool,
comment::Union{String, Nothing},
ignoreemptyrows::Bool,
ignoreemptylines,
ignoreemptylines::Union{Nothing, Bool},
select,
drop,
limit::Union{Integer, Nothing},
buffer_in_memory::Bool,
threaded,
threaded::Union{Nothing, Bool},
ntasks::Union{Nothing, Integer},
tasks,
tasks::Union{Nothing, Integer},
rows_to_check::Integer,
lines_to_check,
lines_to_check::Union{Nothing, Integer},
# parsing options
missingstrings,
missingstring,
delim,
missingstrings::Union{Nothing, String, Vector{String}},
missingstring::Union{Nothing, String, Vector{String}},
delim::Union{Nothing, UInt8, Char, String},
ignorerepeated::Bool,
quoted::Bool,
quotechar,
openquotechar,
closequotechar,
escapechar,
dateformat,
dateformats,
decimal,
truestrings,
falsestrings,
quotechar::Union{UInt8, Char},
openquotechar::Union{Nothing, UInt8, Char},
closequotechar::Union{Nothing, UInt8, Char},
escapechar::Union{UInt8, Char},
dateformat::Union{Nothing, String, Dates.DateFormat, Parsers.Format, AbstractVector, AbstractDict},
dateformats::Union{Nothing, String, Dates.DateFormat, Parsers.Format, AbstractVector, AbstractDict},
decimal::Union{UInt8, Char},
truestrings::Union{Nothing, Vector{String}},
falsestrings::Union{Nothing, Vector{String}},
# type options
type,
types,
typemap,
pool,
type::Union{Nothing, Type},
types::Union{Nothing, Type, AbstractVector, AbstractDict},
typemap::Dict,
pool::Union{Bool, Real, AbstractVector, AbstractDict},
downcast::Bool,
lazystrings,
stringtype,
lazystrings::Bool,
stringtype::StringTypes,
strict::Bool,
silencewarnings::Bool,
maxwarnings::Integer,
Expand All @@ -192,13 +194,15 @@ end

# initial argument validation and adjustment
@inbounds begin
!isa(source, IO) && !isa(source, AbstractVector{UInt8}) && !isa(source, Cmd) && !isfile(source) &&
throw(ArgumentError("\"$source\" is not a valid file or doesn't exist"))
((source isa AbstractString || source isa AbstractPath) && !isfile(source)::Bool) && throw(ArgumentError("\"$source\" is not a valid file or doesn't exist"))
if types !== nothing
if types isa AbstractVector || types isa AbstractDict
any(x->!concrete_or_concreteunion(x), types isa AbstractDict ? values(types) : types) && throw(ArgumentError("Non-concrete types passed in `types` keyword argument, please provide concrete types for columns: $types"))
if types isa AbstractVector
any(x->!concrete_or_concreteunion(x), types) && nonconcretetypes(types)
elseif types isa AbstractDict
typs = values(types)
any(x->!concrete_or_concreteunion(x), typs) && nonconcretetypes(typs)
else
concrete_or_concreteunion(types) || throw(ArgumentError("Non-concrete types passed in `types` keyword argument, please provide concrete types for columns: $types"))
concrete_or_concreteunion(types) || nonconcretetypes(types)
end
end
checkvaliddelim(delim)
Expand Down Expand Up @@ -239,14 +243,28 @@ end
Base.depwarn("`threaded` keyword argument is deprecated; to avoid multithreaded parsing, pass `ntasks=1`", :Context)
ntasks = threaded ? Threads.nthreads() : 1
end
header = (isa(header, Integer) && header == 1 && skipto == 1) ? -1 : header
isa(header, Integer) && skipto != -1 && (skipto > header || throw(ArgumentError("data row ($skipto) must come after header row ($header)")))
skipto = skipto == -1 ? (isa(header, Vector{Symbol}) || isa(header, Vector{String}) ? 0 : last(header)) + 1 : skipto # by default, data starts on line after header
if header isa Integer
if header == 1 && skipto == 1
header = -1
elseif skipto != -1 && skipto < header
throw(ArgumentError("skipto row ($skipto) must come after header row ($header)"))
end
end
if skipto == -1
if isa(header, Vector{Symbol}) || isa(header, Vector{String})
skipto = 0
elseif header isa Integer
# by default, data starts on line after header
skipto = header + 1
elseif header isa AbstractVector{<:Integer}
skipto = last(header) + 1
end
end
debug && println("header is: $header, skipto computed as: $skipto")
# getsource will turn any input into a `AbstractVector{UInt8}`
buf, pos, len, tempfile = getsource(source, buffer_in_memory)
if len > Int64(2)^42
throw(ArgumentError("delimited source to parse too large; must be < $(2^42) bytes"))
if len > MAX_INPUT_SIZE
throw(ArgumentError("delimited source to parse too large; must be < $MAX_INPUT_SIZE bytes"))
end
# skip over initial BOM character, if present
pos = consumeBOM(buf, pos)
Expand All @@ -259,9 +277,12 @@ end
sentinel = missingstring === nothing ? missingstring : (isempty(missingstring) || (missingstring isa Vector && length(missingstring) == 1 && missingstring[1] == "")) ? missing : missingstring isa String ? [missingstring] : missingstring

if delim === nothing
del = isa(source, AbstractString) && endswith(source, ".tsv") ? UInt8('\t') :
isa(source, AbstractString) && endswith(source, ".wsv") ? UInt8(' ') :
UInt8('\n')
if source isa AbstractString || source isa AbstractPath
filename = string(source)
del = endswith(filename, ".tsv") ? UInt8('\t') : endswith(filename, ".wsv") ? UInt8(' ') : UInt8('\n')
else
del = UInt8('\n')
end
else
del = (delim isa Char && isascii(delim)) ? delim % UInt8 :
(sizeof(delim) == 1 && isascii(delim)) ? delim[1] % UInt8 : delim
Expand All @@ -278,32 +299,47 @@ end
end

df = dateformat isa AbstractVector || dateformat isa AbstractDict ? nothing : dateformat
wh1 = UInt8(' ')
wh2 = UInt8('\t')
if sentinel isa Vector
for sent in sentinel
if contains(sent, " ")
wh1 = 0x00
end
if contains(sent, "\t")
wh2 = 0x00
end
end
end
headerpos = datapos = pos
if !transpose
# step 1: detect the byte position where the column names start (headerpos)
# and where the first data row starts (datapos)
headerpos, datapos = detectheaderdatapos(buf, pos, len, oq, eq, cq, cmt, ignoreemptyrows, header, skipto)
debug && println("headerpos = $headerpos, datapos = $datapos")

# step 2: detect delimiter (or use given) and detect number of (estimated) rows and columns
d, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, del, cmt, ignoreemptyrows)
debug && println("estimated rows: $rowsguess")
debug && println("detected delimiter: \"$(escape_string(d isa UInt8 ? string(Char(d)) : d))\"")

# step 3: build Parsers.Options w/ parsing arguments
wh1 = d == UInt(' ') ? 0x00 : UInt8(' ')
wh2 = d == UInt8('\t') ? 0x00 : UInt8('\t')
if sentinel isa Vector
for sent in sentinel
if contains(sent, " ")
wh1 = 0x00
end
if contains(sent, "\t")
wh2 = 0x00
end
end
end
end
# step 2: detect delimiter (or use given) and detect number of (estimated) rows and columns
# step 3: build Parsers.Options w/ parsing arguments
if del isa UInt8
d, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, cmt, ignoreemptyrows, del)
wh1 = d == UInt(' ') ? 0x00 : wh1
wh2 = d == UInt8('\t') ? 0x00 : wh2
options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, d, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug)
elseif del isa Char
_, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, cmt, ignoreemptyrows)
options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, del, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug)
d = del
elseif del isa String
_, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, cmt, ignoreemptyrows)
options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, del, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug)
d = del
else
error("invalid delim type")
end
debug && println("estimated rows: $rowsguess")
debug && println("detected delimiter: \"$(escape_string(d isa UInt8 ? string(Char(d)) : d))\"")

if !transpose
# step 4a: if we're ignoring repeated delimiters, then we ignore any
# that start a row, so we need to check if we need to adjust our headerpos/datapos
if ignorerepeated
Expand All @@ -318,10 +354,6 @@ end
ncols = length(names)
else
# transpose
d, rowsguess = detectdelimandguessrows(buf, pos, pos, len, oq, eq, cq, del, cmt, ignoreemptyrows)
wh1 = d == UInt(' ') ? 0x00 : UInt8(' ')
wh2 = d == UInt8('\t') ? 0x00 : UInt8('\t')
options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, d, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug)
rowsguess, names, positions, endpositions = detecttranspose(buf, pos, len, options, header, skipto, normalizenames)
ncols = length(names)
datapos = isempty(positions) ? 0 : positions[1]
Expand Down Expand Up @@ -357,7 +389,7 @@ end
else
T = types === nothing ? (streaming ? Union{stringtype, Missing} : NeedsTypeDetection) : types
columns = Vector{Column}(undef, ncols)
foreach(1:ncols) do i
for i = 1:ncols
col = Column(T, options)
columns[i] = col
end
Expand Down Expand Up @@ -427,7 +459,7 @@ end
end
elseif select isa Base.Callable
for i = 1:ncols
select(i, names[i]) || willdrop!(columns, i)
select(i, names[i])::Bool || willdrop!(columns, i)
end
else
throw(ArgumentError("`select` keyword argument must be an `AbstractVector` of `Int`, `Symbol`, `String`, or `Bool`, or a selector function of the form `(i, name) -> keep::Bool`"))
Expand All @@ -448,7 +480,7 @@ end
end
elseif drop isa Base.Callable
for i = 1:ncols
drop(i, names[i]) && willdrop!(columns, i)
drop(i, names[i])::Bool && willdrop!(columns, i)
end
else
throw(ArgumentError("`drop` keyword argument must be an `AbstractVector` of `Int`, `Symbol`, `String`, or `Bool`, or a selector function of the form `(i, name) -> keep::Bool`"))
Expand All @@ -459,7 +491,7 @@ end
# determine if we can use threads while parsing
limit = something(limit, typemax(Int64))
minrows = min(limit, rowsguess)
nthreads = something(ntasks, Threads.nthreads())
nthreads = Int(something(ntasks, Threads.nthreads()))
if ntasks === nothing && !streaming && nthreads > 1 && !transpose && minrows > (nthreads * 5) && (minrows * ncols) >= 5_000
threaded = true
ntasks = nthreads
Expand Down Expand Up @@ -487,6 +519,7 @@ end
# but we also don't guarantee limit will be exact w/ multithreaded parsing
origrowsguess = rowsguess
if limit !== typemax(Int64)
limit = Int64(limit)
limitposguess = ceil(Int64, (limit / (origrowsguess * 0.8)) * len)
newlen = [0, limitposguess, min(limitposguess * 2, len)]
findrowstarts!(buf, options, newlen, ncols, columns, stringtype, downcast, 5)
Expand All @@ -495,7 +528,10 @@ end
debug && println("limiting, adjusting len to $len")
end
chunksize = div(len - datapos, ntasks)
chunkpositions = [i == 0 ? datapos : i == ntasks ? len : (datapos + chunksize * i) for i = 0:ntasks]
chunkpositions = Vector{Int64}(undef, ntasks + 1)
for i = 0:ntasks
chunkpositions[i + 1] = i == 0 ? datapos : i == ntasks ? len : (datapos + chunksize * i)
end
debug && println("initial byte positions before adjusting for start of rows: $chunkpositions")
avgbytesperrow, successfullychunked = findrowstarts!(buf, options, chunkpositions, ncols, columns, stringtype, downcast, rows_to_check)
if successfullychunked
Expand All @@ -506,7 +542,9 @@ end
debug && println("multi-threaded column types sampled as: $columns")
# check if we need to adjust column pooling
if finalpool == 0.0 || finalpool == 1.0
foreach(col -> col.pool = finalpool, columns)
for col in columns
col.pool = finalpool
end
end
else
debug && println("something went wrong chunking up a file for multithreaded parsing, falling back to single-threaded parsing")
Expand All @@ -521,7 +559,7 @@ end

end # @inbounds begin
return Context(
Val(transpose),
transpose,
getname(source),
names,
rowsguess,
Expand Down
Loading