Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

category_modulo and category_binning #927

Merged
merged 9 commits into from
Nov 11, 2020
189 changes: 156 additions & 33 deletions datashader/reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,39 +43,158 @@ def apply(self, df):
else:
return df[self.column].values

class CategoryPreprocess(Preprocess):
"""Base class for categorizing preprocessors."""
@property
def cat_column(self):
"""Returns name of categorized column"""
return self.column

def categories(self, input_dshape):
"""Returns list of categories corresponding to input shape"""
raise NotImplementedError("categories not implemented")

def validate(self, in_dshape):
"""Validates input shape"""
raise NotImplementedError("validate not implemented")

class category_codes(Preprocess):
def apply(self, df):
"""Applies preprocessor to DataFrame and returns array"""
raise NotImplementedError("apply not implemented")

class category_codes(CategoryPreprocess):
"""Extract just the category codes from a categorical column."""
def categories(self, input_dshape):
return input_dshape.measure[self.column].categories

def validate(self, in_dshape):
if not self.column in in_dshape.dict:
raise ValueError("specified column not found")
if not isinstance(in_dshape.measure[self.column], ct.Categorical):
raise ValueError("input must be categorical")

def apply(self, df):
if cudf and isinstance(df, cudf.DataFrame):
return df[self.column].cat.codes.to_gpu_array()
else:
return df[self.column].cat.codes.values

class category_values(Preprocess):
"""Extract multiple columns from a dataframe as a numpy array of values."""
def __init__(self, columns):
self.columns = list(columns)
class category_modulo(category_codes):
"""
A variation on category_codes that assigns categories using an integer column, modulo a base.
Category is computed as (column_value - offset)%modulo.
"""

# couldn't find anything in the datashape docs about how to check if a CType is an integer, so just define a big set
IntegerTypes = {ct.bool_, ct.uint8, ct.uint16, ct.uint32, ct.uint64, ct.int8, ct.int16, ct.int32, ct.int64}

def __init__(self, column, modulo, offset=0):
super().__init__(column)
self.offset = offset
self.modulo = modulo

def _hashable_inputs(self):
return super()._hashable_inputs() + (self.offset, self.modulo)

def categories(self, in_dshape):
return list(range(self.modulo))

def validate(self, in_dshape):
if not self.column in in_dshape.dict:
raise ValueError("specified column not found")
if in_dshape.measure[self.column] not in self.IntegerTypes:
raise ValueError("input must be an integer column")

def apply(self, df):
result = (df[self.column] - self.offset) % self.modulo
if cudf and isinstance(df, cudf.DataFrame):
return result.to_gpu_array()
else:
return result.values

class category_binning(category_modulo):
"""
A variation on category_codes that assigns categories by binning a continuously-valued column.
jbednar marked this conversation as resolved.
Show resolved Hide resolved
The number of categories returned is always nbins+1.
The last category (nbin) is for NaNs in the data column, as well as for values under/over the binned
interval (when include_under or include_over is False).

Parameters
----------
column: column to use
bin0: lower bound of first bin
binsize: bin size
jbednar marked this conversation as resolved.
Show resolved Hide resolved
nbins: number of bins
include_under: if True, values below bin 0 are assigned to category 0
include_over: if True, values above the last bin (nbins-1) are assigned to category nbin-1
"""

def __init__(self, column, bin0, binsize, nbins, include_under=True, include_over=True):
super().__init__(column, nbins + 1) # +1 category for NaNs and clipped values
self.bin0 = bin0
self.binsize = binsize
self.nbins = nbins
self.bin_under = 0 if include_under else nbins
self.bin_over = nbins-1 if include_over else nbins

def _hashable_inputs(self):
return super()._hashable_inputs() + (self.bin0, self.binsize, self.bin_under, self.bin_over)

def validate(self, in_dshape):
if not self.column in in_dshape.dict:
raise ValueError("specified column not found")

def apply(self, df):
if cudf and isinstance(df, cudf.DataFrame):
## dunno how to do this in CUDA
raise NotImplementedError("this feature is not implemented in cuda")
else:
value = df[self.column].values
index = ((value - self.bin0) / self.binsize).astype(int)
index[index < 0] = self.bin_under
index[index >= self.nbins] = self.bin_over
index[np.isnan(value)] = self.nbins
return index


class category_values(CategoryPreprocess):
"""Extract a category and a value column from a dataframe as (2,N) numpy array of values."""
def __init__(self, categorizer, value_column):
super().__init__(value_column)
self.categorizer = categorizer

@property
def inputs(self):
return self.columns
return (self.categorizer.column, self.column)

@property
def cat_column(self):
"""Returns name of categorized column"""
return self.categorizer.column

def categories(self, input_dshape):
return self.categorizer.categories

def validate(self, in_dshape):
return self.categorizer.validate(in_dshape)

def apply(self, df):
a = self.categorizer.apply(df)
if cudf and isinstance(df, cudf.DataFrame):
import cupy
if df[self.columns[1]].dtype.kind == 'f':
if df[self.column].dtype.kind == 'f':
nullval = np.nan
else:
nullval = 0
a = cupy.asarray(df[self.columns[0]].cat.codes.to_gpu_array())
b = cupy.asarray(df[self.columns[1]].to_gpu_array(fillna=nullval))
a = cupy.asarray(a)
b = cupy.asarray(df[self.column].to_gpu_array(fillna=nullval))
return cupy.stack((a, b), axis=-1)
else:
a = df[self.columns[0]].cat.codes.values
b = df[self.columns[1]].values
b = df[self.column].values
return np.stack((a, b), axis=-1)



class Reduction(Expr):
"""Base class for per-bin reductions."""
def __init__(self, column=None):
Expand Down Expand Up @@ -139,26 +258,37 @@ def _finalize(bases, cuda=False, **kwargs):
return xr.DataArray(bases[0], **kwargs)

class by(Reduction):
"""Apply the provided reduction separately per categorical ``column`` value.
"""Apply the provided reduction separately per category.
Parameters
----------
column : str
Name of the column to aggregate over. Column data type must be
categorical. Resulting aggregate has an outer dimension axis along the
categories present.
cats: str or CategoryPreprocess instance
Name of column to aggregate over, or a categorizer object that returns categories.
Resulting aggregate has an outer dimension axis along the categories present.
reduction : Reduction
Per-category reduction function.
"""
def __init__(self, cat_column, reduction):
self.columns = (cat_column, getattr(reduction, 'column', None))
def __init__(self, cats, reduction):
jbednar marked this conversation as resolved.
Show resolved Hide resolved
# set basic categorizer
if isinstance(cats, CategoryPreprocess):
self.categorizer = cats
elif isinstance(cats, str):
self.categorizer = category_codes(cats)
else:
raise TypeError("first argument must be a column name or a CategoryPreprocess instance")
self.column = self.categorizer.column # for backwards compatibility with count_cat
self.columns = (self.categorizer.column, getattr(reduction, 'column', None))
self.reduction = reduction
self.column = cat_column # for backwards compatibility with count_cat
# if a value column is supplied, set category_values preprocessor
if self.val_column is not None:
self.preprocess = category_values(self.categorizer, self.val_column)
else:
self.preprocess = self.categorizer

def __hash__(self):
return hash((type(self), self._hashable_inputs(), self.reduction))
return hash((type(self), self._hashable_inputs(), self.categorizer._hashable_inputs(), self.reduction))

def _build_temps(self, cuda=False):
return tuple(by(self.cat_column, tmp) for tmp in self.reduction._build_temps(cuda))
return tuple(by(self.categorizer, tmp) for tmp in self.reduction._build_temps(cuda))

@property
def cat_column(self):
Expand All @@ -169,24 +299,17 @@ def val_column(self):
return self.columns[1]

def validate(self, in_dshape):
if not self.cat_column in in_dshape.dict:
raise ValueError("specified column not found")
if not isinstance(in_dshape.measure[self.cat_column], ct.Categorical):
raise ValueError("input must be categorical")

self.preprocess.validate(in_dshape)
self.reduction.validate(in_dshape)

def out_dshape(self, input_dshape):
cats = input_dshape.measure[self.cat_column].categories
cats = self.categorizer.categories(input_dshape)
red_shape = self.reduction.out_dshape(input_dshape)
return dshape(Record([(c, red_shape) for c in cats]))

@property
def inputs(self):
if self.val_column is not None:
return (category_values(self.columns),)
else:
return (category_codes(self.columns[0]),)
return (self.preprocess, )

def _build_create(self, out_dshape):
n_cats = len(out_dshape.measure.fields)
Expand All @@ -197,7 +320,7 @@ def _build_bases(self, cuda=False):
bases = self.reduction._build_bases(cuda)
if len(bases) == 1 and bases[0] is self:
return bases
return tuple(by(self.cat_column, base) for base in bases)
return tuple(by(self.categorizer, base) for base in bases)

def _build_append(self, dshape, schema, cuda=False):
return self.reduction._build_append(dshape, schema, cuda)
Expand All @@ -206,7 +329,7 @@ def _build_combine(self, dshape):
return self.reduction._combine

def _build_finalize(self, dshape):
cats = list(dshape[self.cat_column].categories)
cats = list(self.categorizer.categories(dshape))

def finalize(bases, cuda=False, **kwargs):
kwargs['dims'] += [self.cat_column]
Expand Down