Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Dask trimesh support #696

Merged
merged 3 commits into from
Feb 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ environment:
install:
- "SET PATH=%CONDA%;%CONDA%\\Scripts;%PATH%"
- "conda install -y -c pyviz pyctdev && doit ecosystem_setup"
- conda install -y "conda<4.6"
- "doit env_create %CHANNELS% --name=test --python=%PY%"
- "activate test"
- "doit develop_install %CHANNELS%"
Expand Down
3 changes: 2 additions & 1 deletion datashader/glyphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ def __init__(self, x, y, z=None, weight_type=True, interp=True):

@property
def inputs(self):
return tuple([self.x, self.y] + list(self.z))
return (tuple([self.x, self.y] + list(self.z)) +
(self.weight_type, self.interpolate))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was needed because the inputs tuple is used by the parent class to implement hashing and equality, which are in tern used for memoization. Without this, I was seeing cases where repeated use of canvas.trimesh with different values for interpolate was not resulting in updated aggregation behavior.


def validate(self, in_dshape):
for col in [self.x, self.y] + list(self.z):
Expand Down
52 changes: 52 additions & 0 deletions datashader/tests/test_dask.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import division
from dask.context import config
import dask.dataframe as dd
import numpy as np
import pandas as pd
import xarray as xr

import datashader as ds
import datashader.utils as du

import pytest

Expand Down Expand Up @@ -476,3 +478,53 @@ def test_trimesh_no_double_edge():
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
], dtype='i4')
np.testing.assert_array_equal(np.flipud(agg.fillna(0).astype('i4').values)[:5], sol)


@pytest.mark.parametrize('npartitions', list(range(1, 6)))
def test_trimesh_dask_partitions(npartitions):
"""Assert that when two triangles share an edge that would normally get
double-drawn, the edge is only drawn for the rightmost (or bottommost)
triangle.
"""
# Test left/right edge shared
verts = dd.from_pandas(pd.DataFrame({'x': [4, 1, 5, 5, 5, 4],
'y': [4, 5, 5, 5, 4, 4]}),
npartitions=npartitions)
tris = dd.from_pandas(
pd.DataFrame(
{'v0': [0, 3], 'v1': [1, 4], 'v2': [2, 5], 'val': [1, 2]}),
npartitions=npartitions)

cvs = ds.Canvas(plot_width=20, plot_height=20,
x_range=(0, 5), y_range=(0, 5))

# Precompute mesh with dask dataframes
mesh = du.mesh(verts, tris)

# Make sure mesh is a dask DataFrame
assert isinstance(mesh, dd.DataFrame)

# Check mesh length
n = len(mesh)
assert n == 6

# Make sure we have expected number of partitions
expected_chunksize = int(np.ceil(len(mesh) / (3*npartitions)) * 3)
expected_npartitions = int(np.ceil(n / expected_chunksize))
assert expected_npartitions == mesh.npartitions

# Make sure triangles don't straddle partitions
partitions_lens = mesh.map_partitions(len).compute()
for partitions_len in partitions_lens:
assert partitions_len % 3 == 0

agg = cvs.trimesh(verts, tris, mesh)
sol = np.array([
[0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 2, 2, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
], dtype='i4')
np.testing.assert_array_equal(
np.flipud(agg.fillna(0).astype('i4').values)[:5], sol)
18 changes: 8 additions & 10 deletions datashader/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,18 +450,16 @@ def _dd_mesh(vertices, simplices):
Dask DataFrame objects.
"""
# Construct mesh by indexing into vertices with simplex indices
# TODO: For dask: avoid .compute() calls, and add winding auto-detection
vertex_idxs = simplices.values[:, :3].astype(np.int64)
vals = vertices.values.compute()[vertex_idxs]
vals = vals.reshape(np.prod(vals.shape[:2]), vals.shape[2])
res = pd.DataFrame(vals, columns=vertices.columns)
# TODO: For dask: avoid .compute() calls
res = _pd_mesh(vertices.compute(), simplices.compute())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were calling compute on both vertices and simplices anyway, so I opted to just call the pandas versions. In addition making this more concise, the pandas version has winding auto-detection enabled that was not previously enabled here.


# If vertices don't have weights, use simplex weights
verts_have_weights = len(vertices.columns) > 2
if not verts_have_weights:
weight_col = simplices.columns[3]
res[weight_col] = simplices.values[:, 3].compute().repeat(3)
# Compute a chunksize that will not split the vertices of a single
# triangle across partitions
approx_npartitions = max(vertices.npartitions, simplices.npartitions)
chunksize = int(np.ceil(len(res) / (3*approx_npartitions)) * 3)

# Create dask dataframe
res = dd.from_pandas(res, chunksize=chunksize)
return res


Expand Down
32 changes: 32 additions & 0 deletions examples/user_guide/6_Trimesh.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"import numpy as np, datashader as ds, pandas as pd\n",
"import datashader.utils as du, datashader.transfer_functions as tf\n",
"from scipy.spatial import Delaunay\n",
"import dask.dataframe as dd\n",
"\n",
"n = 10\n",
"np.random.seed(2)\n",
Expand Down Expand Up @@ -345,6 +346,37 @@
" tf.shade(cvs.trimesh(verts, tris, mesh=mesh, agg=ds.std('z')), name='std')).cols(3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Parallelizing trimesh aggregation with Dask\n",
"The trimesh aggregation process can be parallelized by providing `du.mesh` and `Canvas.trimesh` with partitioned Dask dataframes.\n",
"\n",
"**Note:** While the calls to `Canvas.trimesh` will be parallelized across the partitions of the Dask dataframe, the construction of the partitioned mesh using `du.mesh` is not currently parallelized. Furthermore, it currently requires loading the entire `verts` and `tris` dataframes into memory in order to construct the partitioned mesh. Because of these constraints, this approach is most useful for the repeated aggregation of large meshes that fit in memory on a single multicore machine."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"verts_ddf = dd.from_pandas(verts, npartitions=4)\n",
"tris_ddf = dd.from_pandas(tris, npartitions=4)\n",
"mesh_ddf = du.mesh(verts_ddf, tris_ddf)\n",
"mesh_ddf"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tf.shade(cvs.trimesh(verts_ddf, tris_ddf, mesh=mesh_ddf))"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down