-
Notifications
You must be signed in to change notification settings - Fork 933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEA] read_csv
context-passing interface for distributed/segmented parsing
#11728
Comments
@rjzamora This issue serves as our proposal for the new parsing-context aware CSV reader. We would love to discuss with you (or another Dask stakeholder) in more detail. |
Nice - I can't say I understand how the transition-vector logic works, but the general plan seems reasonable to me. My current understanding is that the corresponding dask graph would look something like this: We start by selecting overlapping byte ranges from the dataset (or perhaps they don't need to overlap?). Then we map the Is this the general algorithm you have in mind? |
couple of points for completeness:
|
Thanks for clarifying. We will likely want to use a distinct IO task to physically pull each byte range from a (possibly remote) dataset. This means a given "read local context" task will only have access to the specific bytes that were read in the IO task it depends on. It seems like we would want to assign non-overlapping byte ranges to the "read local context" tasks, but that the IO tasks they depend on would need to have a bit of overlap (to deal with the "plus/mins one character" issue).
Lightweight sounds good. A map-overlap pattern may still be preferred (if possible), since allgather opperations are not particularly performant in Dask. |
This sounds like a problematic limitation - if a byte range ends shortly after the start of a long row, your IO task may provide too little data. Are you worried about reading the same data twice here? Maybe we could also buffer that on the libcudf data_source level? To expand some more,
The very worst case I could imagine would be a single dask task's byte range that is completely contained within a row, so the corresponding output is empty, but its predecessor would need to read past that task's byte range. |
You are correct that it will be possible to "break" the Dask logic by assigning byte ranges that are too small (or by assigning too-little overlap). As far as I understand, this is already potential problem in dask's Now that I think about it, this is no longer the only alternative, because we are now able to perform partial IO from a file opened with |
If we wanted to do this 100% cleanly, we could also extract row offsets in the parse context stage and exchange them between neighboring tasks/workers. Then the initial IO could be limited to the byte range + 1, and the subsequent parsing IO would know exactly which bytes to load a priori Coming back to the communication patterns:
Overlaps could potentially work somewhat heuristically in most cases, but I'd be more comfortable with an always safe solution, since the error-proneness across byte range boundaries was the original motivation for this discussion. |
That makes sense. It would be fine for the first pass at the Dask algorithm to just execute a distinct task graph to calculate the correct byte range for each partition, followed by a conventional |
Is your feature request related to a problem? Please describe.
To parse files from a regular language like CSV, we can only safely parse data inside a byte range if we know the parsing context from before this range. Without this information, we may accidentally interpret a record delimiter (newline) inside a quoted field as an actual delimiter.
More formally, when starting from a byte range, we don't know what state the DFA of the token language is in, so we need to store the transition vector starting from every possible state, and combine the vectors by function composition in the associative scan operation. This is pretty much identical to what is happening in the finite state transducer.
Instead of just running the scan on a full file, we can run it only on a byte range, and combine the resulting transition vectors in an exclusive scan over all byte ranges to establish local parsing context.
Describe the solution you'd like
I want to propose a slight extension to the
read_csv
interface to handle this case:class csv_parse_context
that opaquely wrapspacked_rowctx_t
, only exposing themerge_row_contexts
functionality to combine the parsing context from adjacent byte ranges, and afinalize
operation that turns the transition vector into its value starting from the initial DFA state.read_csv_context
function that only scans the local byte range to compute itscsv_parse_context
transition vector. It can probably take the same parameters asread_csv
csv_parse_context initial_parsing_state
parameter tocsv_reader_options
that defaults to the initial state. Theread_csv
function can then use this initial state to determine record offsets and do the actual parsing.Describe alternatives you've considered
Alternatively, we could implement backtracking by reading chunks before the byte range until we figured out an unambiguous parser state (that is not the error state). This could in the worst case lead to reading the entire prefix up to the byte range.
Additional context
This is relevant if we want
dask.read_csv
to be able to handle quoted record delimiters (i.e. newlines) where the opening quote occurs before the byte range.The interface has the advantage that it can be tested in isolation on a single node, without having to rely on dask.
The same kind of pattern could also apply to
read_json
, where on top of the regular parsing state, we also need to pass the stack transition operations from the beginning to the end of the byte range.The text was updated successfully, but these errors were encountered: