-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feedback #1
Comments
Hey @martindurant thanks for pinging me, and this is awesome! Great to see a working Dask integration, and love the name 😄 Looking at the I also see some gaps in the implementation, for example, hidden partitioning which is the secret sauce of Iceberg. But you already mentioned that it is a POC.
I don't think we should choose between PyArrow or fastparquet support, I think we should make them both available. Once we have apache/iceberg#6069 in, we know which files we need to load, and we can integrate the creation of the dask dataframe into PyIceberg. What do you think? |
More fair to say "filesystem-based datasets", which is more likely to be S3 for me. Getting the same metadata from a service is reasonable, but this method cuts out the middleman.
Since I don't know what this is, you are clearly right!
Totally agree, but of course we do need to be aware of the capabilities of both. Since I am responsible for fastparquet, I can make any changes there that might be needed, like the linked PR. Changes in arrow are much harder to achieve (for me, and probably in general). |
@Fokko, for some background here, I've generated some sample data in a public S3 bucket that @martindurant is probably working off of. Configuring cross-account Glue access or a dedicated Nessie server just to provide access to some static tables in S3 is overkill. I realize the criticality of having a catalog for atomicity but in a slow-changing dataset, being able to point Iceberg directly at an S3 location and launching some queries outweighs the complexity of doing it "right", IMO. I think supporting I'd be interested to hear more about the internal conversations that have led to the current position that a catalog will be required. |
@martindurant in the sample data we have a Hidden Partitioning lets you store only a For a full-featured table scan that functionality will be necessary, but hopefully the pyiceberg project will handle that complexity for us soon. |
Oh those partitions. Yes, this is supported, as many transforms as I could be bothered so far ( https://github.com/martindurant/daskberg/blob/main/daskberg/conversions.py#L46 ); but there are not many. So you may have a column called "name" and partition on buckets, and expect a filter like |
The last note I had on the current implementation is it looks like you'll currently open every manifest file from the manifest list and begin scanning for matching partitions. The manifest list also has partition bounds that should be considered so that you only have to open a manifest file if the partition is within that manifest's bounds. For our data, we might be generating a few manifest files per day, and we wouldn't want to retrieve the hundreds of those from S3 just to see if they contain data for the date we're looking for. Instead we should work from the manifest list, identify which manifest files include datafiles within our bounds, and then open those manifest files and restrict further based on the column bounds. I might be looking past that functionality in the code, but thought I'd mention just in case. |
Ah, I didn't realise that the manifests had partition max-min values - very clever! It will be trivial to deal with those, although harder to engineer some data to test. Furthermore, we should be using fsspec's magic to collect the bytes of all the required manifest avro files concurrently, rather than the current serial approach; but these optimisations can wait. I have managed to make my own test data, but only with spark-sql. I found an OK tutorial in dremio documentation. |
Sorry for not sharing this earlier, but I always use this to create test-data: https://github.com/tabular-io/docker-spark-iceberg You could mount the table under the same path on your host system. The metadata is available through the REST catalog. This also allows for the easy creation of different partition transforms. |
Hi @martindurant, I have been using dask for a while now and have recently started playing around with iceberg coming across this thread :)
I noticed that pyiceberg can read tables and daskberg will also be able to read tables (haven't tested myself quite yet). Have you found a way to create tables without requiring spark? I have mostly stayed in the dask ecosystem and would like to if possible 😛 Thanks! |
No, daskberg does not have a write mode, so not for now. It would not be too hard, at least with pure files (rather than the REST server), but I was wanting first to see if there was any interest in this package or not.
Really? |
That would be awesome if Daskberg could 😀 I thought it could read but looks like you have to pass the file list to something else. Seems to support pyarrow and duckdb |
I see, it looks like |
Note that pyiceberg also has no writing. You can create tables/schemas, but not put any data. Actually you can do that with daskberg too. The next pyiceberg community meeting isn't until the 31st, I'll try to find out what their plans and timelines are. It may well still prove worthwhile to maintain this package as a simpler alternative, where I can fold in secret fastparquet sauce. |
Ah yes, after exploring a bit I noticed that as well. I could not really tell exactly what the spark engine does when writing to the table. I assume it let's the catalog/table know all the relevant information but looks like some of the niceties like schema updates might not be fully baked in (https://github.com/apache/iceberg/blob/cf00f6a06b256e9c4defe226b6a37aa83c40f561/python/pyiceberg/table/__init__.py#L79). That said, I am very new to the iceberg world and still trying to wrap my head around it. |
Metadata updates involve writing a new JSON "v" file, and data updates involve writing the new files (parquet), manifests for those (and for deletes) and a new manifest list "snapshot" file in avro. Communicating this to a server appears to need sending the same information bundled as JSON to the REST endpoint (haven't checked hive/thrift). The complexity comes in partitioning the writes, and any transforms in the partitioning or sorting to be applied - maybe not necessary for simple operation, but a big part of iceberg's "secret sauce". Plus, dask has no API yet to do anything other than append to a dataset. |
Happy new year everyone!
It is a bit more involved than we initially anticipated. We ended up implementing the ID-based projection ourselves. Instead of relying on the names of the fields, Iceberg relies on the IDs of each field that are embedded in the Parquet file (and Iceberg schema). For example, if you write a bunch of files, and then rename a column, with Iceberg you don't have to rewrite the files that you wrote before because we just map the ID of the column, and rename that column when reading the data. I would expect that we need to do something similar to With respect to writing, that is a bit further along because we want to make sure that we write the Avro files correctly, and test this thoroughly. There is nothing worse than corrupting a table in a table format :) Simple operations like appending files should be quite easy. For more complicated stuff like updating, I think a bit more work is involved (where merge-on-read is easier than the copy-on-write strategy). |
cc @Fokko .
This is a super simple implementation of an iceberg client for dask. It works for the limited couple of datasets I have available including
This is POC, but I think an effective one.
That the client works on data locations/paths only, but they can be remote. It is the opposite of
pyiceberg
in this sense, which talks to the metadata store over thrift or REST.Note that fastparquet loads decimal columns as float64, and arrow makes python decimal objects (super slow!) even though it supports a decimal type internally.
The text was updated successfully, but these errors were encountered: