diff --git a/src/CSV.jl b/src/CSV.jl index 37de2090..617cdcf2 100644 --- a/src/CSV.jl +++ b/src/CSV.jl @@ -39,6 +39,8 @@ const DEFAULT_MAX_INLINE_STRING_LENGTH = 32 const TRUE_STRINGS = ["true", "True", "TRUE", "T", "1"] const FALSE_STRINGS = ["false", "False", "FALSE", "F", "0"] const ValidSources = Union{Vector{UInt8}, SubArray{UInt8, 1, Vector{UInt8}}, IO, Cmd, AbstractString, AbstractPath} +const MAX_INPUT_SIZE = Int64(2)^42 +const EMPTY_INT_ARRAY = Int64[] include("keyworddocs.jl") include("utils.jl") @@ -67,4 +69,10 @@ end include("precompile.jl") _precompile_() +function __init__() + # CSV.File(IOBuffer(PRECOMPILE_DATA)) + # foreach(row -> row, CSV.Rows(IOBuffer(PRECOMPILE_DATA))) + # CSV.File(joinpath(dirname(pathof(CSV)), "..", "test", "testfiles", "promotions.csv")) +end + end # module diff --git a/src/context.jl b/src/context.jl index 15104d7d..4a4a0bc5 100644 --- a/src/context.jl +++ b/src/context.jl @@ -110,8 +110,10 @@ function checkinvalidcolumns(dict, argname, ncols, names) return end +@noinline nonconcretetypes(types) = throw(ArgumentError("Non-concrete types passed in `types` keyword argument, please provide concrete types for columns: $types")) + struct Context - transpose::Val + transpose::Bool name::String names::Vector{Symbol} rowsguess::Int64 @@ -139,7 +141,7 @@ struct Context streaming::Bool end -@refargs function Context(source, +@refargs function Context(source::ValidSources, # file options # header can be a row number, range of rows, or actual string vector header::Union{Integer, Vector{Symbol}, Vector{String}, AbstractVector{<:Integer}}, @@ -150,39 +152,39 @@ end transpose::Bool, comment::Union{String, Nothing}, ignoreemptyrows::Bool, - ignoreemptylines, + ignoreemptylines::Union{Nothing, Bool}, select, drop, limit::Union{Integer, Nothing}, buffer_in_memory::Bool, - threaded, + threaded::Union{Nothing, Bool}, ntasks::Union{Nothing, Integer}, - tasks, + tasks::Union{Nothing, Integer}, rows_to_check::Integer, - lines_to_check, + lines_to_check::Union{Nothing, Integer}, # parsing options - missingstrings, - missingstring, - delim, + missingstrings::Union{Nothing, String, Vector{String}}, + missingstring::Union{Nothing, String, Vector{String}}, + delim::Union{Nothing, UInt8, Char, String}, ignorerepeated::Bool, quoted::Bool, - quotechar, - openquotechar, - closequotechar, - escapechar, - dateformat, - dateformats, - decimal, - truestrings, - falsestrings, + quotechar::Union{UInt8, Char}, + openquotechar::Union{Nothing, UInt8, Char}, + closequotechar::Union{Nothing, UInt8, Char}, + escapechar::Union{UInt8, Char}, + dateformat::Union{Nothing, String, Dates.DateFormat, Parsers.Format, AbstractVector, AbstractDict}, + dateformats::Union{Nothing, String, Dates.DateFormat, Parsers.Format, AbstractVector, AbstractDict}, + decimal::Union{UInt8, Char}, + truestrings::Union{Nothing, Vector{String}}, + falsestrings::Union{Nothing, Vector{String}}, # type options - type, - types, - typemap, - pool, + type::Union{Nothing, Type}, + types::Union{Nothing, Type, AbstractVector, AbstractDict}, + typemap::Dict, + pool::Union{Bool, Real, AbstractVector, AbstractDict}, downcast::Bool, - lazystrings, - stringtype, + lazystrings::Bool, + stringtype::StringTypes, strict::Bool, silencewarnings::Bool, maxwarnings::Integer, @@ -192,13 +194,15 @@ end # initial argument validation and adjustment @inbounds begin - !isa(source, IO) && !isa(source, AbstractVector{UInt8}) && !isa(source, Cmd) && !isfile(source) && - throw(ArgumentError("\"$source\" is not a valid file or doesn't exist")) + ((source isa AbstractString || source isa AbstractPath) && !isfile(source)::Bool) && throw(ArgumentError("\"$source\" is not a valid file or doesn't exist")) if types !== nothing - if types isa AbstractVector || types isa AbstractDict - any(x->!concrete_or_concreteunion(x), types isa AbstractDict ? values(types) : types) && throw(ArgumentError("Non-concrete types passed in `types` keyword argument, please provide concrete types for columns: $types")) + if types isa AbstractVector + any(x->!concrete_or_concreteunion(x), types) && nonconcretetypes(types) + elseif types isa AbstractDict + typs = values(types) + any(x->!concrete_or_concreteunion(x), typs) && nonconcretetypes(typs) else - concrete_or_concreteunion(types) || throw(ArgumentError("Non-concrete types passed in `types` keyword argument, please provide concrete types for columns: $types")) + concrete_or_concreteunion(types) || nonconcretetypes(types) end end checkvaliddelim(delim) @@ -239,14 +243,28 @@ end Base.depwarn("`threaded` keyword argument is deprecated; to avoid multithreaded parsing, pass `ntasks=1`", :Context) ntasks = threaded ? Threads.nthreads() : 1 end - header = (isa(header, Integer) && header == 1 && skipto == 1) ? -1 : header - isa(header, Integer) && skipto != -1 && (skipto > header || throw(ArgumentError("data row ($skipto) must come after header row ($header)"))) - skipto = skipto == -1 ? (isa(header, Vector{Symbol}) || isa(header, Vector{String}) ? 0 : last(header)) + 1 : skipto # by default, data starts on line after header + if header isa Integer + if header == 1 && skipto == 1 + header = -1 + elseif skipto != -1 && skipto < header + throw(ArgumentError("skipto row ($skipto) must come after header row ($header)")) + end + end + if skipto == -1 + if isa(header, Vector{Symbol}) || isa(header, Vector{String}) + skipto = 0 + elseif header isa Integer + # by default, data starts on line after header + skipto = header + 1 + elseif header isa AbstractVector{<:Integer} + skipto = last(header) + 1 + end + end debug && println("header is: $header, skipto computed as: $skipto") # getsource will turn any input into a `AbstractVector{UInt8}` buf, pos, len, tempfile = getsource(source, buffer_in_memory) - if len > Int64(2)^42 - throw(ArgumentError("delimited source to parse too large; must be < $(2^42) bytes")) + if len > MAX_INPUT_SIZE + throw(ArgumentError("delimited source to parse too large; must be < $MAX_INPUT_SIZE bytes")) end # skip over initial BOM character, if present pos = consumeBOM(buf, pos) @@ -259,9 +277,12 @@ end sentinel = missingstring === nothing ? missingstring : (isempty(missingstring) || (missingstring isa Vector && length(missingstring) == 1 && missingstring[1] == "")) ? missing : missingstring isa String ? [missingstring] : missingstring if delim === nothing - del = isa(source, AbstractString) && endswith(source, ".tsv") ? UInt8('\t') : - isa(source, AbstractString) && endswith(source, ".wsv") ? UInt8(' ') : - UInt8('\n') + if source isa AbstractString || source isa AbstractPath + filename = string(source) + del = endswith(filename, ".tsv") ? UInt8('\t') : endswith(filename, ".wsv") ? UInt8(' ') : UInt8('\n') + else + del = UInt8('\n') + end else del = (delim isa Char && isascii(delim)) ? delim % UInt8 : (sizeof(delim) == 1 && isascii(delim)) ? delim[1] % UInt8 : delim @@ -278,32 +299,47 @@ end end df = dateformat isa AbstractVector || dateformat isa AbstractDict ? nothing : dateformat + wh1 = UInt8(' ') + wh2 = UInt8('\t') + if sentinel isa Vector + for sent in sentinel + if contains(sent, " ") + wh1 = 0x00 + end + if contains(sent, "\t") + wh2 = 0x00 + end + end + end + headerpos = datapos = pos if !transpose # step 1: detect the byte position where the column names start (headerpos) # and where the first data row starts (datapos) headerpos, datapos = detectheaderdatapos(buf, pos, len, oq, eq, cq, cmt, ignoreemptyrows, header, skipto) debug && println("headerpos = $headerpos, datapos = $datapos") - - # step 2: detect delimiter (or use given) and detect number of (estimated) rows and columns - d, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, del, cmt, ignoreemptyrows) - debug && println("estimated rows: $rowsguess") - debug && println("detected delimiter: \"$(escape_string(d isa UInt8 ? string(Char(d)) : d))\"") - - # step 3: build Parsers.Options w/ parsing arguments - wh1 = d == UInt(' ') ? 0x00 : UInt8(' ') - wh2 = d == UInt8('\t') ? 0x00 : UInt8('\t') - if sentinel isa Vector - for sent in sentinel - if contains(sent, " ") - wh1 = 0x00 - end - if contains(sent, "\t") - wh2 = 0x00 - end - end - end + end + # step 2: detect delimiter (or use given) and detect number of (estimated) rows and columns + # step 3: build Parsers.Options w/ parsing arguments + if del isa UInt8 + d, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, cmt, ignoreemptyrows, del) + wh1 = d == UInt(' ') ? 0x00 : wh1 + wh2 = d == UInt8('\t') ? 0x00 : wh2 options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, d, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug) + elseif del isa Char + _, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, cmt, ignoreemptyrows) + options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, del, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug) + d = del + elseif del isa String + _, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, cmt, ignoreemptyrows) + options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, del, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug) + d = del + else + error("invalid delim type") + end + debug && println("estimated rows: $rowsguess") + debug && println("detected delimiter: \"$(escape_string(d isa UInt8 ? string(Char(d)) : d))\"") + if !transpose # step 4a: if we're ignoring repeated delimiters, then we ignore any # that start a row, so we need to check if we need to adjust our headerpos/datapos if ignorerepeated @@ -318,10 +354,6 @@ end ncols = length(names) else # transpose - d, rowsguess = detectdelimandguessrows(buf, pos, pos, len, oq, eq, cq, del, cmt, ignoreemptyrows) - wh1 = d == UInt(' ') ? 0x00 : UInt8(' ') - wh2 = d == UInt8('\t') ? 0x00 : UInt8('\t') - options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, d, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug) rowsguess, names, positions, endpositions = detecttranspose(buf, pos, len, options, header, skipto, normalizenames) ncols = length(names) datapos = isempty(positions) ? 0 : positions[1] @@ -357,7 +389,7 @@ end else T = types === nothing ? (streaming ? Union{stringtype, Missing} : NeedsTypeDetection) : types columns = Vector{Column}(undef, ncols) - foreach(1:ncols) do i + for i = 1:ncols col = Column(T, options) columns[i] = col end @@ -427,7 +459,7 @@ end end elseif select isa Base.Callable for i = 1:ncols - select(i, names[i]) || willdrop!(columns, i) + select(i, names[i])::Bool || willdrop!(columns, i) end else throw(ArgumentError("`select` keyword argument must be an `AbstractVector` of `Int`, `Symbol`, `String`, or `Bool`, or a selector function of the form `(i, name) -> keep::Bool`")) @@ -448,7 +480,7 @@ end end elseif drop isa Base.Callable for i = 1:ncols - drop(i, names[i]) && willdrop!(columns, i) + drop(i, names[i])::Bool && willdrop!(columns, i) end else throw(ArgumentError("`drop` keyword argument must be an `AbstractVector` of `Int`, `Symbol`, `String`, or `Bool`, or a selector function of the form `(i, name) -> keep::Bool`")) @@ -459,7 +491,7 @@ end # determine if we can use threads while parsing limit = something(limit, typemax(Int64)) minrows = min(limit, rowsguess) - nthreads = something(ntasks, Threads.nthreads()) + nthreads = Int(something(ntasks, Threads.nthreads())) if ntasks === nothing && !streaming && nthreads > 1 && !transpose && minrows > (nthreads * 5) && (minrows * ncols) >= 5_000 threaded = true ntasks = nthreads @@ -487,6 +519,7 @@ end # but we also don't guarantee limit will be exact w/ multithreaded parsing origrowsguess = rowsguess if limit !== typemax(Int64) + limit = Int64(limit) limitposguess = ceil(Int64, (limit / (origrowsguess * 0.8)) * len) newlen = [0, limitposguess, min(limitposguess * 2, len)] findrowstarts!(buf, options, newlen, ncols, columns, stringtype, downcast, 5) @@ -495,7 +528,10 @@ end debug && println("limiting, adjusting len to $len") end chunksize = div(len - datapos, ntasks) - chunkpositions = [i == 0 ? datapos : i == ntasks ? len : (datapos + chunksize * i) for i = 0:ntasks] + chunkpositions = Vector{Int64}(undef, ntasks + 1) + for i = 0:ntasks + chunkpositions[i + 1] = i == 0 ? datapos : i == ntasks ? len : (datapos + chunksize * i) + end debug && println("initial byte positions before adjusting for start of rows: $chunkpositions") avgbytesperrow, successfullychunked = findrowstarts!(buf, options, chunkpositions, ncols, columns, stringtype, downcast, rows_to_check) if successfullychunked @@ -506,7 +542,9 @@ end debug && println("multi-threaded column types sampled as: $columns") # check if we need to adjust column pooling if finalpool == 0.0 || finalpool == 1.0 - foreach(col -> col.pool = finalpool, columns) + for col in columns + col.pool = finalpool + end end else debug && println("something went wrong chunking up a file for multithreaded parsing, falling back to single-threaded parsing") @@ -521,7 +559,7 @@ end end # @inbounds begin return Context( - Val(transpose), + transpose, getname(source), names, rowsguess, diff --git a/src/detection.jl b/src/detection.jl index fc6a778b..20a9c46d 100644 --- a/src/detection.jl +++ b/src/detection.jl @@ -1,5 +1,5 @@ # figure out at what byte position the header row(s) start and at what byte position the data starts -function detectheaderdatapos(buf, pos, len, oq, eq, cq, cmt, ignoreemptyrows, @nospecialize(header), skipto) +function detectheaderdatapos(buf, pos, len, oq, eq, cq, @nospecialize(cmt), ignoreemptyrows, @nospecialize(header), skipto) headerpos = 0 datapos = 1 if header isa Integer @@ -25,7 +25,7 @@ end # it tries to guess a file's delimiter by which character showed up w/ the same frequency # over all rows scanned; we use the average # of bytes per row w/ total length of the file # to guess the total # of rows in the file -function detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, delim, cmt, ignoreemptyrows) +function detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, @nospecialize(cmt), ignoreemptyrows, delim=0x00) nbytes = 0 lastbytenewline = false parsedanylines = false @@ -115,6 +115,7 @@ function detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, delim end end nlines += parsedanylines && !lastbytenewline + d = delim if delim == UInt8('\n') if nlines > 0 d = UInt8('\n') @@ -143,8 +144,6 @@ function detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, delim else d = UInt8(',') end - else # delim explicitly provided - d = delim end guess = ((len - datapos) / (nbytes / nlines)) rowsguess = isfinite(guess) ? ceil(Int, guess) : 0 @@ -162,7 +161,7 @@ function incr!(c::ByteValueCounter, b::UInt8) end # given the various header and normalization options, figure out column names for a file -function detectcolumnnames(buf, headerpos, datapos, len, options, @nospecialize(header), normalizenames) +function detectcolumnnames(buf, headerpos, datapos, len, options, @nospecialize(header), normalizenames)::Vector{Symbol} if header isa Union{AbstractVector{Symbol}, AbstractVector{String}} fields, pos = readsplitline(buf, datapos, len, options) isempty(header) && return [Symbol(:Column, i) for i = 1:length(fields)] @@ -187,7 +186,7 @@ function detectcolumnnames(buf, headerpos, datapos, len, options, @nospecialize( end # efficiently skip from `cur` to `dest` row -function skiptorow(buf, pos, len, oq, eq, cq, cmt, ignoreemptyrows, cur, dest) +function skiptorow(buf, pos, len, oq, eq, cq, @nospecialize(cmt), ignoreemptyrows, cur, dest) nlines = Ref{Int}(0) pos = checkcommentandemptyline(buf, pos, len, cmt, ignoreemptyrows, nlines) cur += nlines[] @@ -273,7 +272,7 @@ end const NLINES = Ref{Int}(0) -function checkcommentandemptyline(buf, pos, len, cmt, ignoreemptyrows, nlines=NLINES) +function checkcommentandemptyline(buf, pos, len, @nospecialize(cmt), ignoreemptyrows, nlines=NLINES) cmtptr, cmtlen = cmt === nothing ? (C_NULL, 0) : cmt ptr = pointer(buf, pos) while pos <= len @@ -467,50 +466,7 @@ function findrowstarts!(buf, opts, ranges, ncols, columns, @nospecialize(stringt findchunkrowstart(ranges, i, buf, opts, downcast, ncols, rows_to_check, columns, lock, stringtype, totalbytes, totalrows, succeeded) end end - finalrows = totalrows[] - !succeeded[] && return totalbytes[] / finalrows, false - # alright, we successfully identified the starting byte positions for each chunk - # now let's take a look at our samples we parsed, and juice up our column types/flags - # for n = 1:ncols - # col = columns[n] - # type = col.type - # # build up a refpool of initial values if applicable - # # if not, or cardinality is too high, we'll set the column to non-pooled at this stage - # if type === NeedsTypeDetection - # # if the type is still NeedsTypeDetection, that means we only sampled `missing` values - # # in that case, the type will stay NeedsTypeDetection and may be detected while parsing - # # by individual chunks over the whole file - # # as for pooling, we'll only pool in this case if user explicitly asked for pooling - # # so maybepooled(col) or isnan(col.pool) won't turn into PooledArray for multithreading - # # unless we sample something other than `missing` during this stage - # # if !pooled(col) - # # col.pool = 0.0 - # # end - # else - # # if (pooled(col) || maybepooled(col) || (type isa StringTypes && pool != 0.0)) - # # refpool = RefPool(type) - # # for m = 1:M - # # if isassigned(samples, m, n) - # # val = samples[m, n] - # # if val isa type || (val isa PosLen && type isa StringTypes) - # # getref!(refpool, type, PosLenString(buf, val, opts.e)) - # # end - # # end - # # end - # # poolval = !isnan(col.pool) ? col.pool : !isnan(pool) ? pool : DEFAULT_POOL - # # if pooled(col) || ((length(refpool.refs) - 1) / finalrows) <= poolval - # # col.refpool = refpool - # # col.pool = 1.0 - # # else - # # col.pool = 0.0 - # # end - # # else - # # col.pool = 0.0 - # # end - # end - # col.type = type - # end - return totalbytes[] / finalrows, true + return totalbytes[] / totalrows[], succeeded[] end function detecttranspose(buf, pos, len, options, @nospecialize(header), skipto, normalizenames) diff --git a/src/file.jl b/src/file.jl index 671f29ef..7d3a6e36 100644 --- a/src/file.jl +++ b/src/file.jl @@ -213,7 +213,8 @@ function File(source::ValidSources; # select=nothing;drop=nothing;limit=nothing;threaded=nothing;ntasks=Threads.nthreads();tasks=nothing;rows_to_check=30;lines_to_check=nothing;missingstrings=String[];missingstring=""; # delim=nothing;ignorerepeated=false;quoted=true;quotechar='"';openquotechar=nothing;closequotechar=nothing;escapechar='"';dateformat=nothing; # dateformats=nothing;decimal=UInt8('.');truestrings=nothing;falsestrings=nothing;type=nothing;types=nothing;typemap=Dict{Type,Type}(); - # pool=CSV.DEFAULT_POOL;downcast=false;lazystrings=false;stringtype=String;strict=false;silencewarnings=false;maxwarnings=100;debug=true;parsingdebug=false; + # pool=CSV.DEFAULT_POOL;downcast=false;lazystrings=false;stringtype=String;strict=false;silencewarnings=false;maxwarnings=100;debug=true;parsingdebug=false;buffer_in_memory=false + # @descend CSV.Context(CSV.Arg(source), CSV.Arg(header), CSV.Arg(normalizenames), CSV.Arg(datarow), CSV.Arg(skipto), CSV.Arg(footerskip), CSV.Arg(transpose), CSV.Arg(comment), CSV.Arg(ignoreemptyrows), CSV.Arg(ignoreemptylines), CSV.Arg(select), CSV.Arg(drop), CSV.Arg(limit), CSV.Arg(buffer_in_memory), CSV.Arg(threaded), CSV.Arg(ntasks), CSV.Arg(tasks), CSV.Arg(rows_to_check), CSV.Arg(lines_to_check), CSV.Arg(missingstrings), CSV.Arg(missingstring), CSV.Arg(delim), CSV.Arg(ignorerepeated), CSV.Arg(quoted), CSV.Arg(quotechar), CSV.Arg(openquotechar), CSV.Arg(closequotechar), CSV.Arg(escapechar), CSV.Arg(dateformat), CSV.Arg(dateformats), CSV.Arg(decimal), CSV.Arg(truestrings), CSV.Arg(falsestrings), CSV.Arg(type), CSV.Arg(types), CSV.Arg(typemap), CSV.Arg(pool), CSV.Arg(downcast), CSV.Arg(lazystrings), CSV.Arg(stringtype), CSV.Arg(strict), CSV.Arg(silencewarnings), CSV.Arg(maxwarnings), CSV.Arg(debug), CSV.Arg(parsingdebug), CSV.Arg(false)) ctx = @refargs Context(source, header, normalizenames, datarow, skipto, footerskip, transpose, comment, ignoreemptyrows, ignoreemptylines, select, drop, limit, buffer_in_memory, threaded, ntasks, tasks, rows_to_check, lines_to_check, missingstrings, missingstring, delim, ignorerepeated, quoted, quotechar, openquotechar, closequotechar, escapechar, dateformat, dateformats, decimal, truestrings, falsestrings, type, types, typemap, pool, downcast, lazystrings, stringtype, strict, silencewarnings, maxwarnings, debug, parsingdebug, false) return File(ctx) end @@ -223,7 +224,7 @@ function File(ctx::Context, chunking::Bool=false) # we now do our parsing pass over the file, starting at datapos if ctx.threaded # multithreaded parsing - rowsguess, ntasks, columns, limit = ctx.rowsguess, ctx.ntasks, ctx.columns, ctx.limit + rowsguess, ntasks, columns = ctx.rowsguess, ctx.ntasks, ctx.columns # calculate our guess for how many rows will be parsed by each concurrent parsing task rowchunkguess = cld(rowsguess, ntasks) wholecolumnslock = ReentrantLock() # in case columns are widened during parsing @@ -233,49 +234,11 @@ function File(ctx::Context, chunking::Bool=false) foreach(col -> col.lock = ReentrantLock(), columns) rows = zeros(Int64, ntasks) # how many rows each parsing task ended up actually parsing @sync for i = 1:ntasks - Threads.@spawn begin - tt = Base.time() - task_columns = [Column(col) for col in columns] # task-local columns derived from top-level columns - allocate!(task_columns, rowchunkguess) - pertaskcolumns[i] = task_columns - task_pos = ctx.chunkpositions[i] - task_len = ctx.chunkpositions[i + 1] - (i != ntasks) - # for error-reporting purposes, we want to try and give the best guess of where a row emits a warning/error, so compute that - rowchunkoffset = (ctx.datarow - 1) + (rowchunkguess * (i - 1)) - task_rows, task_pos = parsefilechunk!(ctx, ctx.buf, task_pos, task_len, rowchunkguess, rowchunkoffset, task_columns, ctx.transpose, ctx.customtypes) - rows[i] = task_rows - # promote column types/flags this task detected while parsing - lock(wholecolumnslock) do - # check if this task widened columns while parsing - if length(task_columns) > length(columns) - for j = (length(columns) + 1):length(task_columns) - col = task_columns[j] # I'm pretty sure it's ok to just use the per-task column directly here as new top-level column? - # initialize lock since it hasn't been initialized yet - col.lock = ReentrantLock() - push!(columns, col) - end - end - end - # now we know that columns is at least as long as task_columns - for j = 1:length(task_columns) - col = columns[j] - # note col.lock is shared amongst all tasks (i.e. belongs to parent columns[i].lock) - lock(col.lock) do - task_col = task_columns[j] - T = col.type - col.type = something(promote_types(T, task_col.type), ctx.stringtype) - if T !== col.type - ctx.debug && println("promoting col = $j from $T to $(col.type), task chunk ($i) was type = $(task_col.type)") - end - col.anymissing |= task_col.anymissing - end - end - ctx.debug && println("finished parsing $task_rows rows on task = $i: time for parsing: $(Base.time() - tt)") - end + Threads.@spawn multithreadparse(ctx, pertaskcolumns, rowchunkguess, i, rows, wholecolumnslock) end finalrows = sum(rows) if ctx.limit < finalrows - finalrows = limit + finalrows = ctx.limit end # ok, all the parsing tasks have finished and we've promoted their types w/ the top-level columns # so now we just need to finish processing each column by making ChainedVectors of the individual columns @@ -291,93 +254,8 @@ function File(ctx::Context, chunking::Bool=false) end end @sync for (j, col) in enumerate(columns) - Threads.@spawn begin - for i = 1:ntasks - task_columns = pertaskcolumns[i] - task_col = task_columns[j] - task_rows = rows[i] - # check if we need to promote a task-local column based on what other threads parsed - T = col.type # final promoted type from amongst all separate parsing tasks - T2 = task_col.type - if T isa StringTypes && !(T2 isa StringTypes) - # promoting non-string to string column - ctx.debug && println("multithreaded promoting column $j to string from $T2") - task_len = ctx.chunkpositions[i + 1] - (i != ntasks) - task_pos = ctx.chunkpositions[i] - promotetostring!(ctx, ctx.buf, task_pos, task_len, task_rows, (ctx.datarow - 1) + (rowchunkguess * (i - 1)), task_columns, ctx.transpose, ctx.customtypes, j, Ref(0), task_rows, T) - elseif T === Float64 && T2 <: Integer - # one chunk parsed as Int, another as Float64, promote to Float64 - ctx.debug && println("multithreaded promoting column $j to float") - if pooled(col) - task_col.refpool.refs = convert(Refs{Float64}, task_col.refpool.refs) - else - task_col.column = convert(SentinelVector{ Float64}, task_col.column) - end - elseif T !== T2 && T <: InlineString - if task_col.column isa Vector{UInt32} - task_col.refpool.refs = convert(Refs{T}, task_col.refpool.refs) - else - task_col.column = convert(SentinelVector{T}, task_col.column) - end - elseif T !== T2 - # one chunk parsed all missing values, but another chunk had a typed value, promote to that - # while keeping all values `missing` (allocate by default ensures columns have all missing values) - ctx.debug && println("multithreaded promoting column $j from missing on task $i") - task_col.column = allocate(pooled(col) ? Pooled : T, task_rows) - end - # synchronize refs if needed - if pooled(col) || (isnan(col.pool) && col.type isa StringTypes) - if !isdefined(col, :refpool) && isdefined(task_col, :refpool) - # this case only occurs if user explicitly passed pool=true - # but we only parsed `missing` values during sampling - col.refpool = task_col.refpool - elseif isdefined(col, :refpool) && pooltype(col) !== T && isdefined(task_col, :refpool) - # we pooled/detected one type while sampling, but parsing promoted to another type - # if the task_col has a refpool, then we know it's the promoted type - col.refpool = task_col.refpool - elseif isdefined(task_col, :refpool) - syncrefs!(col.type, col, task_col, task_rows) - end - end - end -@label threadedprocesscolumn - if pooled(col) - makechain!(col.type, pertaskcolumns, col, j, ntasks) - # pooled columns are the one case where we invert the order of arrays; - # i.e. we return PooledArray{T, ChainedVector{T}} instead of ChainedVector{T, PooledArray{T}} - makepooled!(col) - elseif isnan(col.pool) && col.type isa StringTypes - poolval = !isnan(col.pool) ? col.pool : !isnan(ctx.pool) ? ctx.pool : DEFAULT_POOL - if ((length(col.refpool.refs) - 1) / finalrows) <= poolval - col.pool = 1.0 - @goto threadedprocesscolumn - else - # cardinality too high, so unpool - col.pool = 0.0 - #!!!!FIXME: need to pass in parent refpool, not pertaskcolumn refpools - foreach(cols -> unpool!(cols[j], col.type, col.refpool), pertaskcolumns) - @goto threadedprocesscolumn - end - elseif col.type === Int64 - # we need to special-case Int64 here because while parsing, a default Int64 sentinel value is chosen to - # represent missing; if any chunk bumped into that sentinel value while parsing, then it cycled to a - # new sentinel value; this step ensure that each chunk has the same encoded sentinel value - # passing force=false means it will first check if all chunks already have the same sentinel and return - # immediately if so, which will be the case most often - SentinelArrays.newsentinel!((pertaskcolumns[i][j].column::SVec{Int64} for i = 1:ntasks)...; force=false) - makechain!(col.type, pertaskcolumns, col, j, ntasks) - elseif col.type === PosLenString - col.column = ChainedVector(PosLenStringVector{coltype(col)}[makeposlen!(pertaskcolumns[i][j], coltype(col), ctx) for i = 1:ntasks]) - elseif col.type === NeedsTypeDetection || col.type === HardMissing - col.type = Missing - col.column = MissingVector(finalrows) - else - makechain!(col.type, pertaskcolumns, col, j, ntasks) - end - if finalrows < length(col.column) - # we only ever resize! down here, so no need to use reallocate! - resize!(col.column, finalrows) - end + let finalrows=finalrows + Threads.@spawn multithreadpostparse(ctx, ntasks, pertaskcolumns, rows, rowchunkguess, finalrows, j, col) end end else @@ -385,7 +263,7 @@ function File(ctx::Context, chunking::Bool=false) columns = ctx.columns allocate!(columns, ctx.rowsguess) t = Base.time() - finalrows, pos = parsefilechunk!(ctx, ctx.buf, ctx.datapos, ctx.len, ctx.rowsguess, 0, columns, ctx.transpose, ctx.customtypes) + finalrows, pos = parsefilechunk!(ctx, ctx.datapos, ctx.len, ctx.rowsguess, 0, columns, ctx.customtypes)::Tuple{Int64, Int64} ctx.debug && println("time for initial parsing: $(Base.time() - t)") # cleanup our columns if needed for col in columns @@ -456,7 +334,137 @@ function File(ctx::Context, chunking::Bool=false) return File(ctx.name, names, types, finalrows, length(columns), columns, lookup) end -const EMPTY_INT_ARRAY = Int64[] +function multithreadparse(ctx, pertaskcolumns, rowchunkguess, i, rows, wholecolumnslock) + columns = ctx.columns + tt = Base.time() + task_columns = [Column(col) for col in columns] # task-local columns derived from top-level columns + allocate!(task_columns, rowchunkguess) + pertaskcolumns[i] = task_columns + task_pos = ctx.chunkpositions[i] + task_len = ctx.chunkpositions[i + 1] - (i != ctx.ntasks) + # for error-reporting purposes, we want to try and give the best guess of where a row emits a warning/error, so compute that + rowchunkoffset = (ctx.datarow - 1) + (rowchunkguess * (i - 1)) + task_rows, task_pos = parsefilechunk!(ctx, task_pos, task_len, rowchunkguess, rowchunkoffset, task_columns, ctx.customtypes)::Tuple{Int64, Int64} + rows[i] = task_rows + # promote column types/flags this task detected while parsing + lock(wholecolumnslock) do + # check if this task widened columns while parsing + if length(task_columns) > length(columns) + for j = (length(columns) + 1):length(task_columns) + col = task_columns[j] # I'm pretty sure it's ok to just use the per-task column directly here as new top-level column? + # initialize lock since it hasn't been initialized yet + col.lock = ReentrantLock() + push!(columns, col) + end + end + end + # now we know that columns is at least as long as task_columns + for j = 1:length(task_columns) + col = columns[j] + # note col.lock is shared amongst all tasks (i.e. belongs to parent columns[i].lock) + lock(col.lock) do + task_col = task_columns[j] + T = col.type + col.type = something(promote_types(T, task_col.type), ctx.stringtype) + if T !== col.type + ctx.debug && println("promoting col = $j from $T to $(col.type), task chunk ($i) was type = $(task_col.type)") + end + col.anymissing |= task_col.anymissing + end + end + ctx.debug && println("finished parsing $task_rows rows on task = $i: time for parsing: $(Base.time() - tt)") + return +end + +function multithreadpostparse(ctx, ntasks, pertaskcolumns, rows, rowchunkguess, finalrows, j, col) + for i = 1:ntasks + task_columns = pertaskcolumns[i] + task_col = task_columns[j] + task_rows = rows[i] + # check if we need to promote a task-local column based on what other threads parsed + T = col.type # final promoted type from amongst all separate parsing tasks + T2 = task_col.type + if T isa StringTypes && !(T2 isa StringTypes) + # promoting non-string to string column + ctx.debug && println("multithreaded promoting column $j to string from $T2") + task_len = ctx.chunkpositions[i + 1] - (i != ntasks) + task_pos = ctx.chunkpositions[i] + promotetostring!(ctx, ctx.buf, task_pos, task_len, task_rows, (ctx.datarow - 1) + (rowchunkguess * (i - 1)), task_columns, ctx.customtypes, j, Ref(0), task_rows, T) + elseif T === Float64 && T2 <: Integer + # one chunk parsed as Int, another as Float64, promote to Float64 + ctx.debug && println("multithreaded promoting column $j to float") + if pooled(col) + task_col.refpool.refs = convert(Refs{Float64}, task_col.refpool.refs) + else + task_col.column = convert(SentinelVector{Float64}, task_col.column) + end + elseif T !== T2 && T <: InlineString + if task_col.column isa Vector{UInt32} + task_col.refpool.refs = convert(Refs{T}, task_col.refpool.refs) + else + task_col.column = convert(SentinelVector{T}, task_col.column) + end + elseif T !== T2 + # one chunk parsed all missing values, but another chunk had a typed value, promote to that + # while keeping all values `missing` (allocate by default ensures columns have all missing values) + ctx.debug && println("multithreaded promoting column $j from missing on task $i") + task_col.column = allocate(pooled(col) ? Pooled : T, task_rows) + end + # synchronize refs if needed + if pooled(col) || (isnan(col.pool) && col.type isa StringTypes) + if !isdefined(col, :refpool) && isdefined(task_col, :refpool) + # this case only occurs if user explicitly passed pool=true + # but we only parsed `missing` values during sampling + col.refpool = task_col.refpool + elseif isdefined(col, :refpool) && pooltype(col) !== T && isdefined(task_col, :refpool) + # we pooled/detected one type while sampling, but parsing promoted to another type + # if the task_col has a refpool, then we know it's the promoted type + col.refpool = task_col.refpool + elseif isdefined(task_col, :refpool) + syncrefs!(col.type, col, task_col, task_rows) + end + end + end +@label threadedprocesscolumn + if pooled(col) + makechain!(col.type, pertaskcolumns, col, j, ntasks) + # pooled columns are the one case where we invert the order of arrays; + # i.e. we return PooledArray{T, ChainedVector{T}} instead of ChainedVector{T, PooledArray{T}} + makepooled!(col) + elseif isnan(col.pool) && col.type isa StringTypes + poolval = !isnan(col.pool) ? col.pool : !isnan(ctx.pool) ? ctx.pool : DEFAULT_POOL + if ((length(col.refpool.refs) - 1) / finalrows) <= poolval + col.pool = 1.0 + @goto threadedprocesscolumn + else + # cardinality too high, so unpool + col.pool = 0.0 + foreach(cols -> unpool!(cols[j], col.type, col.refpool), pertaskcolumns) + @goto threadedprocesscolumn + end + elseif col.type === Int64 + # we need to special-case Int64 here because while parsing, a default Int64 sentinel value is chosen to + # represent missing; if any chunk bumped into that sentinel value while parsing, then it cycled to a + # new sentinel value; this step ensure that each chunk has the same encoded sentinel value + # passing force=false means it will first check if all chunks already have the same sentinel and return + # immediately if so, which will be the case most often + SentinelArrays.newsentinel!((pertaskcolumns[i][j].column::SVec{Int64} for i = 1:ntasks)...; force=false) + makechain!(col.type, pertaskcolumns, col, j, ntasks) + elseif col.type === PosLenString + col.column = ChainedVector(PosLenStringVector{coltype(col)}[makeposlen!(pertaskcolumns[i][j], coltype(col), ctx) for i = 1:ntasks]) + elseif col.type === NeedsTypeDetection || col.type === HardMissing + col.type = Missing + col.column = MissingVector(finalrows) + else + makechain!(col.type, pertaskcolumns, col, j, ntasks) + end + if finalrows < length(col.column) + # we only ever resize! down here, so no need to use reallocate! + resize!(col.column, finalrows) + end + return +end + const EMPTY_REFRECODE = UInt32[] # after multithreaded parsing, we need to synchronize pooled refs from different chunks of the file @@ -573,7 +581,9 @@ function makeposlen!(col, T, ctx) return col.column end -function parsefilechunk!(ctx::Context, buf, pos, len, rowsguess, rowoffset, columns, TR::Val{transpose}, ::Type{customtypes}) where {transpose, customtypes} +function parsefilechunk!(ctx::Context, pos, len, rowsguess, rowoffset, columns, ::Type{customtypes})::Tuple{Int64, Int64} where {customtypes} + buf = ctx.buf + transpose = ctx.transpose limit = ctx.limit row = 0 startpos = pos @@ -582,7 +592,7 @@ function parsefilechunk!(ctx::Context, buf, pos, len, rowsguess, rowoffset, colu while true row += 1 # @show columns - @inbounds pos = parserow(startpos, row, numwarnings, ctx, buf, pos, len, rowsguess, rowoffset, columns, TR, customtypes) + @inbounds pos = parserow(startpos, row, numwarnings, ctx, buf, pos, len, rowsguess, rowoffset, columns, customtypes)::Int64 # @show columns row == limit && break (transpose ? all(c -> c.position >= c.endposition, columns) : pos > len) && break @@ -616,12 +626,12 @@ end @noinline fatalerror(buf, pos, len, code, row, col) = throw(Error("thread = $(Threads.threadid()) fatal error, encountered an invalidly quoted field while parsing around row = $row, col = $col: \"$(String(buf[pos:pos+len-1]))\", error=$(Parsers.codes(code)), check your `quotechar` arguments or manually fix the field in the file itself")) @noinline toomanywwarnings() = @warn("thread = $(Threads.threadid()): too many warnings, silencing any further warnings") -Base.@propagate_inbounds function parserow(startpos, row, numwarnings, ctx::Context, buf, pos, len, rowsguess, rowoffset, columns, TR::Val{transpose}, ::Type{customtypes}) where {transpose, customtypes} +Base.@propagate_inbounds function parserow(startpos, row, numwarnings, ctx::Context, buf, pos, len, rowsguess, rowoffset, columns, ::Type{customtypes})::Int64 where {customtypes} # @show columns ncols = length(columns) for i = 1:ncols col = columns[i] - if transpose + if ctx.transpose pos = col.position end type = col.type @@ -687,9 +697,9 @@ Base.@propagate_inbounds function parserow(startpos, row, numwarnings, ctx::Cont else newT = ctx.stringtype end - promotetostring!(ctx, buf, startpos, len, rowsguess, rowoffset, columns, TR, customtypes, i, numwarnings, row, newT) + promotetostring!(ctx, buf, startpos, len, rowsguess, rowoffset, columns, customtypes, i, numwarnings, row, newT) end - if transpose + if ctx.transpose col.position = pos else if i < ncols @@ -943,7 +953,7 @@ end end end -@noinline function promotetostring!(ctx::Context, buf, pos, len, rowsguess, rowoffset, columns, TR::Val{transpose}, ::Type{customtypes}, column_to_promote, numwarnings, limit, stringtype) where {transpose, customtypes} +@noinline function promotetostring!(ctx::Context, buf, pos, len, rowsguess, rowoffset, columns, ::Type{customtypes}, column_to_promote, numwarnings, limit, stringtype) where {customtypes} cols = [i == column_to_promote ? columns[i] : Column(Missing, columns[i].options) for i = 1:length(columns)] col = cols[column_to_promote] if pooled(col) || maybepooled(col) || isnan(col.pool) @@ -958,7 +968,7 @@ end if pos <= len && len > 0 while row < limit row += 1 - @inbounds pos = parserow(startpos, row, numwarnings, ctx, buf, pos, len, rowsguess, rowoffset, cols, TR, customtypes) + @inbounds pos = parserow(startpos, row, numwarnings, ctx, buf, pos, len, rowsguess, rowoffset, cols, customtypes) pos > len && break end end diff --git a/src/precompile.jl b/src/precompile.jl index 85171246..364b9c2a 100644 --- a/src/precompile.jl +++ b/src/precompile.jl @@ -1,31 +1,7 @@ +const PRECOMPILE_DATA = "int,float,date,datetime,bool,null,str,catg,int_float\n1,3.14,2019-01-01,2019-01-01T01:02:03,true,,hey,abc,2\n2,NaN,2019-01-02,2019-01-03T01:02:03,false,,there,abc,3.14\n" function _precompile_() - ccall(:jl_generating_output, Cint, ()) == 1 || return nothing - @assert Base.precompile(Tuple{typeof(parsefilechunk!),Context,Vector{UInt8},Int64,Int64,Int64,Int64,Vector{Column},Val{false},Type{Tuple{}}}) # time: 1.0312017 - @assert Base.precompile(Tuple{typeof(parsevalue!),Type{BigFloat},Vector{UInt8},Int64,Int64,Int64,Int64,Int64,Column,Context}) # time: 0.5886147 - @assert Base.precompile(Tuple{typeof(detect),String}) # time: 0.14655608 - @assert Base.precompile(Tuple{typeof(write),Base.BufferStream,NamedTuple{(:a, :b), Tuple{Vector{Int64}, Vector{Float64}}}}) # time: 0.06289695 - @assert Base.precompile(Tuple{typeof(parsevalue!),Type{BigInt},Vector{UInt8},Int64,Int64,Int64,Int64,Int64,Column,Context}) # time: 0.0527356 - @assert Base.precompile(Tuple{typeof(getname),Cmd}) # time: 0.050515886 - @assert Base.precompile(Tuple{typeof(write),IOBuffer,NamedTuple{(:x,), Tuple{Vector{Char}}}}) # time: 0.035573076 - @assert Base.precompile(Tuple{typeof(parsevalue!),Type{UInt32},Vector{UInt8},Int64,Int64,Int64,Int64,Int64,Column,Context}) # time: 0.035170615 - @assert Base.precompile(Tuple{Type{File},Vector{IOBuffer}}) # time: 0.033758428 - @assert Base.precompile(Tuple{Type{File},Context,Bool}) # time: 0.033758428 - @assert Base.precompile(Tuple{Type{Context},Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg,Arg}) - @assert Base.precompile(Tuple{typeof(write),Base.Process,NamedTuple{(:col1, :col2, :col3), Tuple{Vector{Int64}, Vector{Int64}, Vector{Int64}}}}) # time: 0.03311246 - @assert Base.precompile(Tuple{typeof(detectcolumnnames),Vector{UInt8},Int64,Int64,Int64,Parsers.Options,Any,Bool}) # time: 0.027399136 - @assert Base.precompile(Tuple{typeof(findchunkrowstart),Vector{Int64},Int64,Vector{UInt8},Parsers.Options,Bool,Int64,Int64,Vector{Column},ReentrantLock,Any,Base.Threads.Atomic{Int64},Base.Threads.Atomic{Int64},Base.Threads.Atomic{Bool}}) # time: 0.026987862 - @assert Base.precompile(Tuple{typeof(write),String,Vector{NamedTuple{(:a,), Tuple{String}}}}) # time: 0.022341058 - @assert Base.precompile(Tuple{typeof(makepooled!),Column}) # time: 0.020273618 - @assert Base.precompile(Tuple{typeof(unpool!),Column,Type,RefPool}) # time: 0.019640453 - @assert Base.precompile(Tuple{typeof(Tables.getcolumn),File,Symbol}) # time: 0.018873245 - @assert Base.precompile(Tuple{typeof(getsource),Vector{UInt8},Bool}) # time: 0.013699824 - @assert Base.precompile(Tuple{typeof(detectheaderdatapos),Vector{UInt8},Int64,Int64,UInt8,UInt8,UInt8,Tuple{Ptr{UInt8}, Int64},Bool,Any,Int64}) # time: 0.012563237 - @assert Base.precompile(Tuple{typeof(detectheaderdatapos),Vector{UInt8},Int64,Int64,UInt8,UInt8,UInt8,Nothing,Bool,Any,Int64}) # time: 0.011714202 - @assert Base.precompile(Tuple{typeof(Tables.schema),File}) # time: 0.011355454 - @assert Base.precompile(Tuple{typeof(detectdelimandguessrows),Vector{UInt8},Int64,Int64,Int64,UInt8,UInt8,UInt8,UInt8,Tuple{Ptr{UInt8}, Int64},Bool}) # time: 0.010538074 - @assert Base.precompile(Tuple{Type{Context},Val{false},String,Vector{Symbol},Int64,Int64,Vector{UInt8},Int64,Int64,Int64,Parsers.Options,Vector{Column},Float64,Bool,Type,Dict{DataType, DataType},Type,Int64,Bool,Int64,Vector{Int64},Bool,Bool,Int64,Bool,Nothing,Bool}) # time: 0.010367362 - @assert Base.precompile(Tuple{typeof(detectdelimandguessrows),Vector{UInt8},Int64,Int64,Int64,UInt8,UInt8,UInt8,UInt8,Nothing,Bool}) # time: 0.010337504 - @assert Base.precompile(Tuple{typeof(warning),Type,Vector{UInt8},Int64,Int64,Int16,Int64,Int64}) # time: 0.008919449 - @assert Base.precompile(Tuple{Type{Rows},String}) # time: 0.008758181 - @assert Base.precompile(Tuple{Type{Rows},IOBuffer}) # time: 0.008604496 + # ccall(:jl_generating_output, Cint, ()) == 1 || return nothing + # CSV.File(IOBuffer(PRECOMPILE_DATA)) + # foreach(row -> row, CSV.Rows(IOBuffer(PRECOMPILE_DATA))) + # CSV.File(joinpath(dirname(pathof(CSV)), "..", "test", "testfiles", "promotions.csv")) end diff --git a/src/rows.jl b/src/rows.jl index 87c0c5e8..e911165b 100644 --- a/src/rows.jl +++ b/src/rows.jl @@ -6,7 +6,7 @@ end # no automatic type inference is done, but types are allowed to be passed # for as many columns as desired; `CSV.detect(row, i)` can also be used to # use the same inference logic used in `CSV.File` for determing a cell's typed value -struct Rows{transpose, IO, customtypes, V, stringtype} +struct Rows{IO, customtypes, V, stringtype} name::String names::Vector{Symbol} # only includes "select"ed columns columns::Vector{Column} @@ -134,7 +134,7 @@ function Rows(source::ValidSources; rm(x.file; force=true) end end - return Rows{ctx.transpose, typeof(ctx.buf), ctx.customtypes, eltype(values), ctx.stringtype}( + return Rows{typeof(ctx.buf), ctx.customtypes, eltype(values), ctx.stringtype}( ctx.name, ctx.names, ctx.columns, @@ -188,7 +188,7 @@ Base.IteratorSize(::Type{<:Rows}) = Base.SizeUnknown() end end -function checkwidencolumns!(r::Rows{t, ct, V}, cols) where {t, ct, V} +function checkwidencolumns!(r::Rows{ct, V}, cols) where {ct, V} if cols > length(r.values) # we widened while parsing this row, need to widen other supporting objects for i = (length(r.values) + 1):cols @@ -202,9 +202,9 @@ function checkwidencolumns!(r::Rows{t, ct, V}, cols) where {t, ct, V} return end -@inline function Base.iterate(r::Rows{transpose, IO, customtypes, V, stringtype}, (pos, len, row)=(r.datapos, r.len, 1)) where {transpose, IO, customtypes, V, stringtype} +@inline function Base.iterate(r::Rows{IO, customtypes, V, stringtype}, (pos, len, row)=(r.datapos, r.len, 1)) where {IO, customtypes, V, stringtype} (pos > len || row > r.limit) && return nothing - pos = parserow(1, 1, r.numwarnings, r.ctx, r.buf, pos, len, 1, r.datarow + row - 2, r.columns, transpose, customtypes) + pos = parserow(1, 1, r.numwarnings, r.ctx, r.buf, pos, len, 1, r.datarow + row - 2, r.columns, customtypes) columns = r.columns cols = length(columns) checkwidencolumns!(r, cols) diff --git a/src/utils.jl b/src/utils.jl index e200c54c..9292b156 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -21,11 +21,14 @@ coltype(col) = ifelse(col.anymissing, Union{finaltype(col.type), Missing}, final pooled(col) = col.pool == 1.0 maybepooled(col) = 0.0 < col.pool < 1.0 -getpool(x::Bool) = x ? 1.0 : 0.0 -function getpool(x::Real) - y = Float64(x) - (isnan(y) || 0.0 <= y <= 1.0) || throw(ArgumentError("pool argument must be in the range: 0.0 <= x <= 1.0")) - return y +function getpool(x::Real)::Float64 + if x isa Bool + return x ? 1.0 : 0.0 + else + y = Float64(x) + (isnan(y) || 0.0 <= y <= 1.0) || throw(ArgumentError("pool argument must be in the range: 0.0 <= x <= 1.0")) + return y + end end tupcat(::Type{Tuple{}}, S) = Tuple{S} @@ -61,7 +64,7 @@ isinttype(T) = T === Int8 || T === Int16 || T === Int32 || T === Int64 || T === end # when users pass non-standard types, we need to keep track of them in a Tuple{...} to generate efficient custom parsing kernel codes -function nonstandardtype(T) +@inline function nonstandardtype(T) T = nonmissingtype(T) if T === Union{} || T isa StringTypes || @@ -111,35 +114,43 @@ function allocate!(columns, rowsguess) return end -# MissingVector is an efficient representation in SentinelArrays.jl package -allocate(::Type{NeedsTypeDetection}, len) = MissingVector(len) -allocate(::Type{HardMissing}, len) = MissingVector(len) -allocate(::Type{Missing}, len) = MissingVector(len) -function allocate(::Type{PosLenString}, len) - A = Vector{PosLen}(undef, len) - memset!(pointer(A), typemax(UInt8), sizeof(A)) - return A -end -allocate(::Type{String}, len) = SentinelVector{String}(undef, len) -allocate(::Type{Pooled}, len) = fill(UInt32(1), len) # initialize w/ all missing values -allocate(::Type{Bool}, len) = Vector{Union{Missing, Bool}}(undef, len) -allocate(::Type{T}, len) where {T <: SmallIntegers} = Vector{Union{Missing, T}}(undef, len) -allocate(T, len) = SentinelVector{T}(undef, len) - -reallocate!(A, len) = resize!(A, len) -function reallocate!(A::Vector{UInt32}, len) - oldlen = length(A) - resize!(A, len) - for i = (oldlen + 1):len - @inbounds A[i] = UInt32(1) +@inline function allocate(T, len) + if T === NeedsTypeDetection || T === HardMissing || T === Missing + # MissingVector is an efficient representation in SentinelArrays.jl package + return MissingVector(len) + elseif T === PosLenString + A = Vector{PosLen}(undef, len) + memset!(pointer(A), typemax(UInt8), sizeof(A)) + return A + elseif T === String + return SentinelVector{String}(undef, len) + elseif T === Pooled + return fill(UInt32(1), len) # initialize w/ all missing values + elseif T === Bool + return Vector{Union{Missing, Bool}}(undef, len) + elseif T <: SmallIntegers + return Vector{Union{Missing, T}}(undef, len) + else + return SentinelVector{T}(undef, len) end - return end -# when reallocating, we just need to make sure the missing bit is set for lazy string PosLen -function reallocate!(A::Vector{PosLen}, len) - oldlen = length(A) - resize!(A, len) - memset!(pointer(A, oldlen + 1), typemax(UInt8), (len - oldlen) * 8) + +function reallocate!(@nospecialize(A), len) + if A isa Vector{UInt32} + oldlen = length(A) + resize!(A, len) + # make sure new values are initialized to missing + for i = (oldlen + 1):len + @inbounds A[i] = UInt32(1) + end + elseif A isa Vector{PosLen} + oldlen = length(A) + resize!(A, len) + # when reallocating, we just need to make sure the missing bit is set for lazy string PosLen + memset!(pointer(A, oldlen + 1), typemax(UInt8), (len - oldlen) * 8) + else + resize!(A, len) + end return end @@ -147,7 +158,7 @@ end consumeBOM(buf, pos) = (length(buf) >= 3 && buf[pos] == 0xef && buf[pos + 1] == 0xbb && buf[pos + 2] == 0xbf) ? pos + 3 : pos # whatever input is given, turn it into an AbstractVector{UInt8} we can parse with -function getbytebuffer(x, buffer_in_memory) +@inline function getbytebuffer(x, buffer_in_memory) tfile = nothing if x isa Vector{UInt8} return x, 1, length(x), tfile @@ -183,8 +194,8 @@ function getbytebuffer(x, buffer_in_memory) end end -function getsource(x, buffer_in_memory) - buf, pos, len, tfile = getbytebuffer(x, buffer_in_memory) +function getsource(@nospecialize(x), buffer_in_memory) + buf, pos, len, tfile = getbytebuffer(x, buffer_in_memory)::Tuple{Vector{UInt8},Int64,Int64,Union{Nothing,String}} if length(buf) >= 2 && buf[1] == 0x1f && buf[2] == 0x8b # gzipped source, gunzip it if buffer_in_memory @@ -198,7 +209,7 @@ function getsource(x, buffer_in_memory) return buf, pos, len, tfile end -function buffer_to_tempfile(codec, x) +@inline function buffer_to_tempfile(codec, x) file, output = mktemp() stream = CodecZlib.TranscodingStream(codec, output) Base.write(stream, x) @@ -206,10 +217,15 @@ function buffer_to_tempfile(codec, x) return Mmap.mmap(file), file end -getname(buf::AbstractVector{UInt8}) = "" -getname(cmd::Cmd) = string(cmd) -getname(str) = string(str) -getname(io::I) where {I <: IO} = string("<", I, ">") +@inline function getname(x) + if x isa AbstractVector{UInt8} + return "" + elseif x isa IO + return string("<", typeof(x), ">") + else + return string(x) + end +end # normalizing column name utilities const RESERVED = Set(["local", "global", "export", "let", @@ -457,7 +473,7 @@ macro refargs(ex) (ex.head == :call || ex.head == :function) || throw(ArgumentError("@refargs ex must be function call or definition")) if ex.head == :call for i = 2:length(ex.args) - ex.args[i] = Expr(:call, :Arg, ex.args[i]) + ex.args[i] = Expr(:call, :(CSV.Arg), ex.args[i]) end return esc(ex) else # ex.head == :function @@ -468,10 +484,10 @@ macro refargs(ex) (arg isa Symbol || arg.head == :(::)) || throw(ArgumentError("unsupported argument expression: `$arg`")) nm = arg isa Symbol ? arg : arg.args[1] T = arg isa Symbol ? :Any : arg.args[2] - push!(refs.args, Expr(:(=), nm, Expr(:(::), Expr(:ref, nm), T))) - fargs[i] = Expr(:(::), nm, :Arg) + push!(refs.args, Expr(:(=), Expr(:(::), nm, T), Expr(:ref, nm))) + fargs[i] = Expr(:(::), nm, :(CSV.Arg)) end pushfirst!(ex.args[2].args, refs) return esc(ex) end -end +end \ No newline at end of file