-
Notifications
You must be signed in to change notification settings - Fork 915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add multi-partition DataFrameScan
support to cuDF-Polars
#17441
Add multi-partition DataFrameScan
support to cuDF-Polars
#17441
Conversation
): | ||
raise ValueError( | ||
f"Engine configuration contains unsupported settings: {unsupported}" | ||
) | ||
assert {"chunked", "chunk_read_limit", "pass_read_limit"}.issuperset( | ||
config.get("parquet_options", {}) | ||
) | ||
assert {"num_rows_threshold"}.issuperset(config.get("parallel_options", {})) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to nest all multi-gpu options within the "parallel_options"
moving forward (to avoid adding more top-level keys).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might imagine that these options are executor-specific, does it make sense to have a nesting that is:
executor: str | tuple[str, dict]
So the executor argument is either a name, or a ("name", name-specific-options)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems fine to me. Any opinion on this @pentschev ?
I do think it's a good idea to consider how the number of these options will inevitably grow over time (and that they will probably be executor-specific).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. The str | tuple[str, dict]
logic actually feels a bit clumsy when I think about how to implement it.
How about we just rename "parallel_options"
to "executor_options"
(to make it clear that the options are executor-specific)? This still allows us to validate that the specified arguments are actually supported by the "active" executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As much as I agree that it is indeed clumsy it feels like we'll soon need to have nested options and inevitably make "executor_options"
require accepting str | tuple[str, dict]
, so we may as well just do that in executor
and with that allow as many levels of nested options as needed as part of executor
. I think a better alternative may be an abstract base class Executor
that we can specialize with the options we need for each executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a better alternative may be an abstract base class Executor that we can specialize with the options we need for each executor.
I do think this is the best long-term solution, but I also don't think it will be difficult to migrate from the "executor_options"
approach currently used in this PR.
I don't think I understand why it is inevitable that "executor_options"
would need to accept str | tuple[str, dict]
. However, I do see why it would be useful to attach all executor-specific options to an Executor
object. That said, I don't really want to deal with serialization/etc in this PR :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I understand why it is inevitable that
"executor_options"
would need to acceptstr | tuple[str, dict]
.
It's possible I'm overestimating the amount of options we'll end up introducing here, but once we need nested options we'll need something more complex like the tuple[str, dict]
, or the abstract base class. Thus why I think it's inevitable.
…a/cudf into cudf-polars-multi-dataframe-scan
cc @wence- - Interested to know how you feel about the pattern used here to define/use |
Co-authored-by: Lawrence Mitchell <[email protected]>
…-multi-dataframe-scan
…a/cudf into cudf-polars-multi-dataframe-scan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is basically good, I think my comments are a request for a bit more documentation on the rationale for certain choices.
@lower_ir_node.register(IR) | ||
def _(ir: IR, rec: LowerIRTransformer) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: | ||
# Single-partition default (see: _lower_ir_single) | ||
return rec.state["default_mapper"](ir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we have two recursive transformers:
lower_ir_node
(can handle multi-partitions)- this "default" mapper (cannot handle multi-partitions)
The idea is that we want a single-partition fallback for nodes where we're already defining a multi-partition handler.
However, once we enter the "single-partition" state through this fallback, we can never leave it.
I think I understood why we needed to split between single and multi-partition handlers, but can you explain it here please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah - I just realized I misunderstood your earlier suggestion and messed this up a bit.
The idea is that we want a single-partition fallback for nodes where we're already defining a multi-partition handler.
Sort of. I just want a clean/intuitive way to fall back to "common" logic for any IR type. When we don't have a multi-partition handler defined for the IR type in question, I'd like to fall back to single-partition logic that is defined in one place. That logic would raise an error if there is not actually one partition. If we do have a multi-partition handler defined, it may still make sense for that handler to call that same single-partition logic in some cases (e.g. when support for specific options is missing, or there is only one partition).
A similar pattern will emerge for "partition-wise" operations. We are not going to want to repeat this logic all over the place, instead, we are going to want to call a _lower_ir_partitionwise
function from multi-partition handler for the the IR type in question (e.g. Select).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that I rolled back the "default_mapper" change for now (in favor of more-explicit handling for the fall-back case). Perhaps we can iron out our answer to this question in a PR that focuses on a "tricky" case like Select
.
assert_gpu_result_equal(df, engine=engine) | ||
|
||
# Check partitioning | ||
qir = Translator(df._ldf.visit(), engine).translate_ir() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we need to remember to check that we didn't get any errors. I will t ry and open a PR that does this automatically.
…-multi-dataframe-scan
…-multi-dataframe-scan
/merge |
Adds multi-partition (partition-wise) `Select` support following the same design as #17441 Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: #17495
Adds multi-partition `Scan` support following the same design as #17441 Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: #17494
Description
Follow-up to #17262
Adds support for parallel
DataFrameScan
operations.Checklist