From bf479189f230fb541b4ce3e2da9f175aa837eb90 Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Wed, 23 Jan 2019 19:51:33 -0500 Subject: [PATCH] When calling datashader.utils.mesh with dask dataframes, return a dask dataframe. Computing the mesh still requires bringing the entire vertices/simplices dataframes into memory, but the resulting mesh is now a Dask dataframe with partitions that are chosen intentionally to not cause triangles to straddle partitions. --- datashader/tests/test_dask.py | 52 +++++++++++++++++++++++++++++++++++ datashader/utils.py | 18 ++++++------ 2 files changed, 60 insertions(+), 10 deletions(-) diff --git a/datashader/tests/test_dask.py b/datashader/tests/test_dask.py index 498bed756..43275c4f7 100644 --- a/datashader/tests/test_dask.py +++ b/datashader/tests/test_dask.py @@ -1,3 +1,4 @@ +from __future__ import division from dask.context import config import dask.dataframe as dd import numpy as np @@ -5,6 +6,7 @@ import xarray as xr import datashader as ds +import datashader.utils as du import pytest @@ -416,3 +418,53 @@ def test_trimesh_no_double_edge(): [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], ], dtype='i4') np.testing.assert_array_equal(np.flipud(agg.fillna(0).astype('i4').values)[:5], sol) + + +@pytest.mark.parametrize('npartitions', list(range(1, 6))) +def test_trimesh_dask_partitions(npartitions): + """Assert that when two triangles share an edge that would normally get + double-drawn, the edge is only drawn for the rightmost (or bottommost) + triangle. + """ + # Test left/right edge shared + verts = dd.from_pandas(pd.DataFrame({'x': [4, 1, 5, 5, 5, 4], + 'y': [4, 5, 5, 5, 4, 4]}), + npartitions=npartitions) + tris = dd.from_pandas( + pd.DataFrame( + {'v0': [0, 3], 'v1': [1, 4], 'v2': [2, 5], 'val': [1, 2]}), + npartitions=npartitions) + + cvs = ds.Canvas(plot_width=20, plot_height=20, + x_range=(0, 5), y_range=(0, 5)) + + # Precompute mesh with dask dataframes + mesh = du.mesh(verts, tris) + + # Make sure mesh is a dask DataFrame + assert isinstance(mesh, dd.DataFrame) + + # Check mesh length + n = len(mesh) + assert n == 6 + + # Make sure we have expected number of partitions + expected_chunksize = int(np.ceil(len(mesh) / (3*npartitions)) * 3) + expected_npartitions = int(np.ceil(n / expected_chunksize)) + assert expected_npartitions == mesh.npartitions + + # Make sure triangles don't straddle partitions + partitions_lens = mesh.map_partitions(len).compute() + for partitions_len in partitions_lens: + assert partitions_len % 3 == 0 + + agg = cvs.trimesh(verts, tris, mesh) + sol = np.array([ + [0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 2, 2, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + ], dtype='i4') + np.testing.assert_array_equal( + np.flipud(agg.fillna(0).astype('i4').values)[:5], sol) diff --git a/datashader/utils.py b/datashader/utils.py index 0685c855a..44f521821 100644 --- a/datashader/utils.py +++ b/datashader/utils.py @@ -450,18 +450,16 @@ def _dd_mesh(vertices, simplices): Dask DataFrame objects. """ # Construct mesh by indexing into vertices with simplex indices - # TODO: For dask: avoid .compute() calls, and add winding auto-detection - vertex_idxs = simplices.values[:, :3].astype(np.int64) - vals = vertices.values.compute()[vertex_idxs] - vals = vals.reshape(np.prod(vals.shape[:2]), vals.shape[2]) - res = pd.DataFrame(vals, columns=vertices.columns) + # TODO: For dask: avoid .compute() calls + res = _pd_mesh(vertices.compute(), simplices.compute()) - # If vertices don't have weights, use simplex weights - verts_have_weights = len(vertices.columns) > 2 - if not verts_have_weights: - weight_col = simplices.columns[3] - res[weight_col] = simplices.values[:, 3].compute().repeat(3) + # Compute a chunksize that will not split the vertices of a single + # triangle across partitions + approx_npartitions = max(vertices.npartitions, simplices.npartitions) + chunksize = int(np.ceil(len(res) / (3*approx_npartitions)) * 3) + # Create dask dataframe + res = dd.from_pandas(res, chunksize=chunksize) return res