Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize range calculation operations #344

Merged
merged 11 commits into from
May 8, 2017
4 changes: 2 additions & 2 deletions datashader/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ def __init__(self, plot_width=600, plot_height=600,
x_axis_type='linear', y_axis_type='linear'):
self.plot_width = plot_width
self.plot_height = plot_height
self.x_range = tuple(x_range) if x_range is not None else x_range
self.y_range = tuple(y_range) if y_range is not None else y_range
self.x_range = None if x_range is None else tuple(x_range)
self.y_range = None if y_range is None else tuple(y_range)
self.x_axis = _axis_lookup[x_axis_type]
self.y_axis = _axis_lookup[y_axis_type]

Expand Down
4 changes: 2 additions & 2 deletions datashader/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def dask_pipeline(df, schema, canvas, glyph, summary):


def shape_bounds_st_and_axis(df, canvas, glyph):
x_range = canvas.x_range or glyph._compute_x_bounds(df)
y_range = canvas.y_range or glyph._compute_y_bounds(df)
x_range = canvas.x_range or glyph._compute_x_bounds_dask(df)
y_range = canvas.y_range or glyph._compute_y_bounds_dask(df)
x_min, x_max, y_min, y_max = bounds = compute(*(x_range + y_range))
x_range, y_range = (x_min, x_max), (y_min, y_max)
width = canvas.plot_width
Expand Down
43 changes: 39 additions & 4 deletions datashader/glyphs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import absolute_import, division

from toolz import memoize
from dask.dataframe.hashing import hash_pandas_object
import numpy as np

from .core import Expr
Expand Down Expand Up @@ -28,11 +29,45 @@ def validate(self, in_dshape):
elif not isreal(in_dshape.measure[self.y]):
raise ValueError('y must be real')

def _compute_x_bounds(self, df):
return df[self.x].min(), df[self.x].max()
@staticmethod
@ngjit
def _compute_x_bounds(xs):
minval = maxval = xs[0]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good in many cases, but probably falls over if the first item happens to be nan.
Another little loop to skip over any initial nan values would do here, and I suppose there should be a way to raise an error if it happens all values are nan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch @martindurant. Those cases are covered now. Also added the check for the empty array condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I'm on vacation this week - feel free to make additional changes to this PR as you see fit.

for x in xs:
if not np.isnan(x):
if x < minval:
minval = x
elif x > maxval:
maxval = x
return minval, maxval

@staticmethod
@ngjit
def _compute_y_bounds(ys):
minval = maxval = ys[0]
for y in ys:
if not np.isnan(y):
if y < minval:
minval = y
elif y > maxval:
maxval = y
return minval, maxval

def _compute_y_bounds(self, df):
return df[self.y].min(), df[self.y].max()
@memoize
def _compute_x_bounds_dask(self, df):
"""Like ``PointLike._compute_x_bounds``, but memoized because
``df`` is immutable/hashable (a Dask dataframe).
"""
xs = df[self.x].values
return np.nanmin(xs), np.nanmax(xs)

@memoize
def _compute_y_bounds_dask(self, df):
"""Like ``PointLike._compute_y_bounds``, but memoized because
``df`` is immutable/hashable (a Dask dataframe).
"""
ys = df[self.y].values
return np.nanmin(ys), np.nanmax(ys)


class Point(_PointLike):
Expand Down
4 changes: 2 additions & 2 deletions datashader/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ def pandas_pipeline(df, schema, canvas, glyph, summary):
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(x_mapper, y_mapper, info, append)

x_range = canvas.x_range or glyph._compute_x_bounds(df)
y_range = canvas.y_range or glyph._compute_y_bounds(df)
x_range = canvas.x_range or glyph._compute_x_bounds(df[glyph.x].values)
y_range = canvas.y_range or glyph._compute_y_bounds(df[glyph.y].values)
width = canvas.plot_width
height = canvas.plot_height

Expand Down
4 changes: 2 additions & 2 deletions datashader/tests/test_glyphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
def test_point_bounds_check():
df = pd.DataFrame({'x': [1, 2, 3], 'y': [5, 6, 7]})
p = Point('x', 'y')
assert p._compute_x_bounds(df) == (1, 3)
assert p._compute_y_bounds(df) == (5, 7)
assert p._compute_x_bounds(df['x'].values) == (1, 3)
assert p._compute_y_bounds(df['y'].values) == (5, 7)


def test_point_validate():
Expand Down
13 changes: 9 additions & 4 deletions examples/filetimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,15 @@ def timed_read(filepath,dftype):
return df, duration


def timed_agg(df, filepath, plot_width=int(900), plot_height=int(900*7.0/12)):
CACHED_RANGES = (None, None)
def timed_agg(df, filepath, plot_width=int(900), plot_height=int(900*7.0/12), cache_ranges=True):
global CACHED_RANGES
start = time.time()
cvs = ds.Canvas(plot_width, plot_height)
cvs = ds.Canvas(plot_width, plot_height, x_range=CACHED_RANGES[0], y_range=CACHED_RANGES[1])
agg = cvs.points(df, p.x, p.y)
end = time.time()
if cache_ranges:
CACHED_RANGES = (cvs.x_range, cvs.y_range)
img = export_image(tf.shade(agg),filepath,export_path=".")
return img, end-start

Expand Down Expand Up @@ -273,6 +277,7 @@ def main(argv):
parser.add_argument('--debug', action='store_true', help='Enable increased verbosity and DEBUG messages')
parser.add_argument('--cache', choices=('persist', 'cachey'), default=None, help='Enable caching: "persist" causes Dask dataframes to force loading into memory; "cachey" uses dask.cache.Cache with a cachesize of {}. Caching is disabled by default'.format(int(p.cachesize)))
parser.add_argument('--distributed', action='store_true', help='Enable the distributed scheduler instead of the threaded, which is the default.')
parser.add_argument('--recalc-ranges', action='store_true', help='Tell datashader to recalculate the ranges on each aggregation, instead of caching them (by default).')
args = parser.parse_args(argv[1:])

if args.cache is None:
Expand Down Expand Up @@ -321,7 +326,7 @@ def main(argv):
if DEBUG:
print('DEBUG: Memory usage (after read):\t{} MB'.format(get_proc_mem(), flush=True))

img,aggtime1 = timed_agg(df,filepath,5,5)
img,aggtime1 = timed_agg(df,filepath,5,5,cache_ranges=(not args.recalc_ranges))
if DEBUG:
mem_usage = df.memory_usage(deep=True)
if p.dftype == 'dask':
Expand All @@ -333,7 +338,7 @@ def main(argv):
print('DEBUG: column "{}" dtype: {}'.format(colname, df[colname].dtype))
print('DEBUG: Memory usage (after agg1):\t{} MB'.format(get_proc_mem(), flush=True))

img,aggtime2 = timed_agg(df,filepath)
img,aggtime2 = timed_agg(df,filepath,cache_ranges=(not args.recalc_ranges))
if DEBUG:
print('DEBUG: Memory usage (after agg2):\t{} MB'.format(get_proc_mem(), flush=True))

Expand Down