Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: minimal support for dask.dataframe query planning (dask-expr) #285

Merged
merged 12 commits into from
May 6, 2024

Conversation

jorisvandenbossche
Copy link
Member

@jorisvandenbossche jorisvandenbossche commented Mar 11, 2024

Tackling #284, for now in a minimal way, i.e. keeping as much as possible the current code based on "legacy" dask and then turn that into an expression.

For example, for sjoin/clip/file IO, we manually build up the graph to create a collection using HighLevelGraph, for now this graph is just turned into an expression with from_graph. Long term, we can turn each of those into proper expression classes.

I am not yet fully sure how to best organize the code. For now, the new file expr.py is essentially duplicating a lot of core.py. I organized the commits such that the first just does this copy, and then you can look at the second commit to see the actual changes that were needed to core.py (in expr.py) to make it work with dask-expr -> d28521f

The problem is that I can't do this directly, because we want to keep supporting legacy dask for a while as well (our code is actually also still using this for intermediate results)

Some other remarks:

  • There is one actual test failure that I haven't yet be able to debug/fix (split_out keyword in dissolve), this is xfailed for now
  • The spatial_partitions handling is not yet very robust. For now this just takes the same approach as in core.py to attach this to the collection. But longer term we should attach this to the underlying expression of the collection. The current version "works" (the tests are passing), as long as you only do an operation that needs them directly after an operation that sets them.
  • All the element-wise geospatial methods that we add to the classes are still done using map_partitions like before. Longer term, we should turn those into custom expressions (this can then also allow better optimizations in the expression tree)

Closes #287

@martinfleis
Copy link
Member

@jorisvandenbossche can you ping me once you want a review of this? I'll do some reading on dask-expr in the meantime.

@jorisvandenbossche
Copy link
Member Author

I think this should be ready enough for a review

@TomAugspurger
Copy link
Contributor

I spent a bit of time looking at the split-out failure, but didn't make much progress. I'll take another look later.

@ReptarK
Copy link

ReptarK commented Apr 17, 2024

Hi ,thanks for the efforts on supporting dask-expr.
Any updates on the integration with it ?

@TomAugspurger
Copy link
Contributor

Some discussion at dask/dask-expr#1024.

@TomAugspurger
Copy link
Contributor

TomAugspurger commented May 4, 2024

@jorisvandenbossche I can't push to your branch, but this diff fixes the test test_from_dask_dataframe_with_dask_geoseries. That turned up an error in from_dask_dataframe with geometry=dask_series, which is also fixed in that commit (and I added a test for it directly).

diff --git a/dask_geopandas/expr.py b/dask_geopandas/expr.py
index 99d71d2..136953d 100644
--- a/dask_geopandas/expr.py
+++ b/dask_geopandas/expr.py
@@ -200,6 +200,7 @@ class _Frame(dx.FrameBase, OperatorMethodMixin):
     @classmethod
     def _bind_elemwise_operator_method(cls, name, op, original, *args, **kwargs):
         """bind operator method like GeoSeries.distance to this class"""
+
         # name must be explicitly passed for div method whose name is truediv
         def meth(self, other, *args, **kwargs):
             meta = _emulate(op, self, other)
@@ -505,7 +506,6 @@ class _Frame(dx.FrameBase, OperatorMethodMixin):
         return distances
 
     def geohash(self, as_string=True, precision=12):
-
         """
         Calculate geohash based on the middle points of the geometry bounds
         for a given precision.
@@ -842,7 +842,7 @@ def from_dask_dataframe(df, geometry=None):
     # it via a keyword-argument due to https://github.com/dask/dask/issues/8308.
     # Instead, we assign the geometry column using regular dataframe operations,
     # then refer to that column by name in `map_partitions`.
-    if isinstance(geometry, dd.core.Series):
+    if isinstance(geometry, dx.Series):
         name = geometry.name if geometry.name is not None else "geometry"
         return df.assign(**{name: geometry}).map_partitions(
             geopandas.GeoDataFrame, geometry=name
diff --git a/dask_geopandas/tests/test_core.py b/dask_geopandas/tests/test_core.py
index b28a0c7..fbde582 100644
--- a/dask_geopandas/tests/test_core.py
+++ b/dask_geopandas/tests/test_core.py
@@ -390,27 +390,44 @@ def test_rename_geometry_error(geodf_points):
         dask_obj.rename_geometry("value1")
 
 
-# TODO to_dask_dataframe is now defined on the dask-expr collection, converting
-# to an old-style dd.core.DataFrame (so doing something different as we did here)
-@pytest.mark.xfail(
-    dask_geopandas.backends.QUERY_PLANNING_ON, reason="Need to update test for expr"
-)
 def test_from_dask_dataframe_with_dask_geoseries():
     df = pd.DataFrame({"x": [0, 1, 2, 3], "y": [1, 2, 3, 4]})
     dask_obj = dd.from_pandas(df, npartitions=2)
     dask_obj = dask_geopandas.from_dask_dataframe(
         dask_obj, geometry=dask_geopandas.points_from_xy(dask_obj, "x", "y")
     )
-    # Check that the geometry isn't concatenated and embedded a second time in
-    # the high-level graph. cf. https://github.com/geopandas/dask-geopandas/issues/197
-    k = next(k for k in dask_obj.dask.dependencies if k.startswith("GeoDataFrame"))
-    deps = dask_obj.dask.dependencies[k]
-    assert len(deps) == 1
+
+    if dask_geopandas.backends.QUERY_PLANNING_ON:
+        deps = dask_obj.expr.dependencies()
+        assert len(deps) == 1
+        dep = deps[0]
+        other = list(dask_obj.dask.values())[0][3]["geometry"].dependencies()[0]
+        assert dep is other
+
+    else:
+        # Check that the geometry isn't concatenated and embedded a second time in
+        # the high-level graph. cf. https://github.com/geopandas/dask-geopandas/issues/197
+        k = next(k for k in dask_obj.dask.dependencies if k.startswith("GeoDataFrame"))
+        deps = dask_obj.dask.dependencies[k]
+        assert len(deps) == 1
 
     expected = df.set_geometry(geopandas.points_from_xy(df["x"], df["y"]))
+    dask_obj.geometry.compute()
     assert_geoseries_equal(dask_obj.geometry.compute(), expected.geometry)
 
 
+def test_set_geometry_to_dask_series():
+    df = pd.DataFrame({"x": [0, 1, 2, 3], "y": [1, 2, 3, 4]})
+
+    dask_obj = dd.from_pandas(df, npartitions=2)
+    dask_obj = dask_geopandas.from_dask_dataframe(
+        dask_obj, geometry=dask_geopandas.points_from_xy(dask_obj, "x", "y")
+    )
+    expected = geopandas.GeoDataFrame(df, geometry=geopandas.points_from_xy(df.x, df.y))
+    result = dask_obj.geometry.compute()
+    assert_geoseries_equal(result, expected.geometry)
+
+
 def test_from_dask_dataframe_with_column_name():
     df = pd.DataFrame({"x": [0, 1, 2, 3], "y": [1, 2, 3, 4]})
     df["geoms"] = geopandas.points_from_xy(df["x"], df["y"])

@TomAugspurger
Copy link
Contributor

PR to your branch at jorisvandenbossche#1 that includes that diff and a fix for a couple more issues.

The final xfail with dask-expr is from split_out. I haven't had a chance to follow up on dask/dask-expr#1024, but it seems like it might be related to shuffling in dask-expr.

@jorisvandenbossche
Copy link
Member Author

@TomAugspurger thanks for the fixes!
(and good you mention it here, because apparently you don't watch your own fork by default, so I didn't get any notification from your PR on my repo ..)

@jorisvandenbossche jorisvandenbossche marked this pull request as ready for review May 6, 2024 11:32
def to_legacy_dataframe(self, optimize: bool = True, **optimize_kwargs):
def to_dask_dataframe(self):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomAugspurger I renamed this back to the original name, but now this is fine because dask-expr upstream renamed the to_dask_dataframe they added to to_legacy_dataframe, so there is no longer a name clash.

to_legacy_dataframe will convert to a legacy dask.dataframe collection, which is needed for certain implementations that still use the original implementation (e.g. dask still does this for parquet IO right now).

While our to_dask_dataframe is meant to convert your dask-geopandas object to a dask object (regardless of it being a legacy collection or a new expression).

(and so naming this implementation to_legacy_dataframe actually broke the parquet tests, because it is not doing what dask is expecting, i.e. it doesn't return a legacy collection, just the same object but where the partitions are pd.DataFrames instead of GeoDataFrames)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, makes sense, thanks.

@jorisvandenbossche
Copy link
Member Author

@martinfleis unless you have time on the short term, I would suggest we already merge this to have a working main branch with latest dask, and further fixes/improvements can be done in follow-ups

@martinfleis
Copy link
Member

@jorisvandenbossche Go ahead

@jorisvandenbossche jorisvandenbossche merged commit cc9076f into geopandas:main May 6, 2024
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

BUG: to_parquet() failing with dask=2024.4.1
4 participants