-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reorganize table providers by table format #1010
Conversation
TLDR: I wonder "if DataFusion planning was I really like the idea of splitting the details of reading formats from the "metadata management" (called "table format" in the linked doc) so that users of DataFusion can extend DataFusion to manage metadata in ways suited to their use. I thought, however, we were headed towards a slightly different abstraction where would still have a In terms of the document, https://docs.google.com/document/d/1Bd4-PLLH-pHj0BquMDsJ6cVr_awnxTuvwNJuWsTHxAQ/edit?usp=sharing, my biggest takeaway was that:
At the moment, the user has to synchronously create a This, I am wondering if the fetching of the |
Correct, that will be the next step
Yes, that would really bring a huge amount of flexibility. A funny example: I have just added a sketch implementation of the partition pruning algorithm. One interesting approach is to load the partitions into a I am going to try to make the |
Once |
0264171
to
bb8b91c
Compare
@houqp @alamb @yjshen this is now getting closer to what I am targeting. Some notes for anyone who would want to go through the code:
Note that I am really trying not to add any feature, just re-organize the code. But I think that ones this is done, it will be fairly easy to add the Hive partitioning implem and use the ObjectStore. I know it is a lot to ask (as this is again a fairly large change) but it would be great if you could take a "quick" look and at least validate the overall approach. Thanks 😄 |
I hope to have a look at this PR later today |
|
||
#[test] | ||
fn combine_zero_filters() { | ||
let result = combine_filters(&[]); | ||
assert_eq!(result, None); | ||
} | ||
|
||
#[test] | ||
fn combine_one_filter() { | ||
let filter = binary_expr(col("c1"), Operator::Lt, lit(1)); | ||
let result = combine_filters(&[filter.clone()]); | ||
assert_eq!(result, Some(filter)); | ||
} | ||
|
||
#[test] | ||
fn combine_multiple_filters() { | ||
let filter1 = binary_expr(col("c1"), Operator::Lt, lit(1)); | ||
let filter2 = binary_expr(col("c2"), Operator::Lt, lit(2)); | ||
let filter3 = binary_expr(col("c3"), Operator::Lt, lit(3)); | ||
let result = | ||
combine_filters(&[filter1.clone(), filter2.clone(), filter3.clone()]); | ||
assert_eq!(result, Some(and(and(filter1, filter2), filter3))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these tests were in the wrong place (datasource::parquet.rs
), moving them here to avoid losing them
If you agree with this new way of organizing the file format datasources, my feeling would be that we should merge this as is. We can then remove the existing parquet/csv/json/avro
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not review all the new code carefully (yet) but instead initially focused on the structure.
TLDR is I (really) like it -- nice work @rdettai
For any other reviewers, I suggest looking at the FileFormat
trait and then the organization of ListingTable
.
My biggest question is "why not use the ObjectStore interface in ListingTable
(and maybe call it `ObjectStoreTable)?
It is my opinion that if ListingTable
used the ObjectStore
interface, it would be reasonable to merge this PR: Even though it has substantial duplication with existing code, using the ObjectStore
interface would mean it also had new features.
If we are going to leave ListingTable
file backed, I think it would make sense to remove the old code paths as part of the same PR (so we can use existing end-to-end tests, etc)
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! A table that uses the `ObjectStore` listing capability |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it is still a "TODO" to use ObjectStore
in this module
What do you think about doing that in this PR? The use of ObjectStore
seems like one of the key outcomes of this restructuring work to me, so I do wonder about postponing it (especially as this proposed PR is additive and doesn't change the existing providers)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your are right. I didn't want to add ObjectStore
useage yet to keep the PR simpler, but it definitively makes sense to do it all at once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting turn of event, I just realized that the LocalFileReader
reader from ObjectReader
isn't fully implemented yet. 🙃
I haven't had the time to take a close look at all the changes yet, I am running out of time so I plan to do it tomorrow. From a quick glance, I think the design is solid and on the right track, great work @rdettai 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @rdettai, Thanks a lot for your work! I have several higher-level questions:
-
How do you deal with a table with mixed file format? I didn't get the possibilities in the current implementation. I perceive it as the biggest benefit from this reorganization.
-
I agree with @alamb on putting
ObjectStore
listing API in use in this PR, I think we have all prerequisites needed to use it for real. Since you already madescan
in TableProvider async and doing the refactor of data sources in this one. -
(From my own perspective while reading code) Will this refactor wipe out all the git history for each source file? both in
datasource
module and inphysical_plan
module?I think at least we should remove the deprecated ones in the same PR to preserve git history for each file? I regard the git history as precious documentation for understanding code as a newcomer.
Again, does it make much sense to reorder the ones in
physcial_plan
? I didn't quite get the reason here.
pub trait FileFormat: Send + Sync { | ||
/// Open the files at the paths provided by iterator and infer the | ||
/// common schema | ||
async fn infer_schema(&self, paths: StringStream) -> Result<SchemaRef>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe FileMetaStream
as arguments instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I think it will get to that while I add the ObjectStore
abstraction
/// For example `Vec["a", "b"]` means that the two first levels of | ||
/// partitioning expected should be named "a" and "b": | ||
/// - If there is a third level of partitioning it will be ignored. | ||
/// - Files that don't follow this partitioning will be ignored. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the comments here, we haven't think about how to deal with default partition semantics yet.
I.e while inserting a table with a partition column values NULL, how do we deal with a=20210922/b=NULL/1.parquet? how to differentiate if the NULL is a valid string value or it denotes none exists?
I recall Hive have __HIVE_DEFAULT_PARTITION__
for this purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I know, there isn't an official standard regarding this partitioning. The feature set that I am proposing here is pretty basic:
- you should know the partitions in advance an specify them
- files that don't comply are ignored
- all partition values will be read with type string
I believe this feature set is simple to implement and understand, it is predictable, and it covers most usecases:
- if your partitioning type is not string, you can cast it later in your query as you want
- if you have encoded NULL somehow, you can also parse that with a CASE expression in your query
- performance wise, we can have an equally efficient materialization into an Arrow array than most types if we use dictionary encoding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall Hive have HIVE_DEFAULT_PARTITION for this purpose.
This is correct, hive, and spark inherited from this, uses __HIVE_DEFAULT_PARTITION__
to denote null partition value by default. I feel like we can make this configurable and use __HIVE_DEFAULT_PARTITION__
as the default for compatibility with other systems.
From what I know, there isn't an official standard regarding this partitioning.
I think so too. AFAIK, there is no consensus in the big data ecosystem on a formal partition file path ser/de convention. Most systems are just trying to replicate what hive has.
if your partitioning type is not string, you can cast it later in your query as you want
Could you give us an example on what the UX would look like? How does a user perform partition value type casting in a query? Did you mean user need to manually cast the filter value to string if the filter is applied to a partition column?
Same for the CASE expression for handling NULLs. It would be helpful to have an example.
I think we should probably put more thoughts into this since this design decision will have a big impact to downstream query UX.
/// - Files that don't follow this partitioning will be ignored. | ||
/// Note that only `DataType::Utf8` is supported for the column type. | ||
/// TODO implement case where partitions.len() > 0 | ||
pub partitions: Vec<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to have partitions
a schema instead of Vec<String>
, and introduce a cast if needed from path string to its partition_values
or Vec<ScalarValue>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #1010 (comment)
Thanks for all your great feedback!
Frankly speaking, I have never met a case of mixed file formats, so I wouldn't really know what is important to take into consideration. Can you describe your usecase precisely? Do you have an example of API that supports it?
Sold! I'm working in it 😄
I think that the change is so massive that git will not recognize it as the previous files anyway, especially in the
By "reorder", you mean moving them to a separate folder? The
We can, but it would make this PR huuuuuuuuge. From the development side, that's ok, but for the reviewers it might get tricky. @alamb @houqp what do you think about it? |
I may get this wrong if you are not implying it's possible that a table can have its data in different file formats in the original doc. I thought data lake implementations may store regular data in a columnar format such as parquet, and deltas (add or removes) in a row-based format like Avro or JSON, which makes me feel similar to SAP HANA http://www.vldb.org/pvldb/vol5/p061_jenskrueger_vldb2012.pdf and HyPer http://db.in.tum.de/downloads/publications/datablocks.pdf
Yes, I find it hard to tell what has been removed and what are the additions for the physical_plan files. I diff the files manually and find out For the removal of But I what really want to argue is: why should we use diff manually and leave git diff away? Where can I find the original PR and related issue while reading code but it points to a reorganize PR with little information about it? If we are following the rule that each PR should target at one problem, why should we tell apart reorganize PR into one addition and one stale-removal? And why do we involve restructure the physical_plan module for "nice symmetry" but leave "git will not recognize it as the previous files "? |
I was not implying anything! 😄 I just didn't have an example in mind. Thanks for the pointers! I think that handling these cases will require creating dedicated
I do know that git history is very important, I am not arguing that 😃. Do we all agree that we should directly replace the old implementations in this PR, even if it means that we will need to modify a large part of the code base at once? Does someone have a better split for this PR in mind? PS: just opened a new branch with the addition of the |
For what it is worth, I think the design of this PR makes it easier to support a mixed file format
I can see both sides here (one massive PR vs multiple smaller PRs) and they both involve tradeoffs. Some thoughts on making reviewss easier:
Regardless of the route, I wonder if this PR is one we may want to draw some extra attention to |
To @yjshen 's point of supporting mixed file formats, it's totally a valid use-case and a widely adopted practice. But I agree with @alamb that we don't need to make the list table do everything now. We can keep it simple to only support partitioned table with single file format. For these more complex mixed file format table formats, they typically have their own specific file organization and schema management logics, so it would be hard to come up with one table provider to capture them all. For example, Hudi uses avro and parquet, DeltaLake uses parquet and json, etc. The better approach in my mind is to create table format specific providers for each of these specialized table formats as plugins. I also think there is value in grouping logically coupled changes into a single commit so it's easier to do git diff. I care less about whether git is smart enough to keep track of the file rename, but at least being able to do a git blame and find all logical related changes to this single commit helps a lot. I totally understand how painful it is to do a large PR because of the needs of having to constantly manage merge conflicts, so I am happy to help if there is anything I can do on my end to alleviate your pain. Just let me know ;) I think it's a good tradeoff do a bit of extra work now for the peace of mind in the future. |
Thank you again for your feedback. I don't mind having to manage a large PR, my only concern is the reviewer experience as it is crucial to allow us to lead this to the merge 😉. I have created a PR to the PR rdettai#1 with the addition of the |
f85f66e
to
4d92231
Compare
* [fix] replace file format providers in datafusion * [lint] clippy * [fix] replace file format providers in ballista * [fix] await in python wrapper
2e4452a
to
6db3533
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I too think we should try to merge this into master as soon as we could because it's now at risk of conflicts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Wow thank you so much @rdettai truly astonishing work! |
|
||
/// The configurations to be passed when creating a physical plan for | ||
/// a given file format. | ||
pub struct PhysicalPlanConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileFormatConfig
maybe?
Which issue does this PR close?
Closes #1009.
Rationale for this change
Currently the
TableProvider
implementations are split by file format (Parquet, CSV...). One other solution would be to organizeTableProvider
s would be by table format (file system listing, Iceberg, Delta). This is discussed in this design document.What changes are included in this PR?
TableProvider
, they are replaced with:ListingTable
implementation ofTableProvider
that finds files by using the "list" functionality of the file systemFileFormat
abstraction and implementations for each file formatTableDescriptor
abstractionGetFileMetadata
Ballista rpc endpoint withGetSchema
Errata: I propose not to remove the old implementations but only add the new ones. We will remove the old ones in a separate PR!
Are there any user-facing changes?
The current implementations of
TableProvider
will be replaced, but this will partly be abstracted by methods such asExecutionContext.read_parquet()
orExecutionContext.read_sql()