Skip to content

Commit

Permalink
Merge pull request #161 from realratchet/master
Browse files Browse the repository at this point in the history
fix timedeltas and non-primitive filter
  • Loading branch information
realratchet authored Apr 8, 2024
2 parents 27f39c6 + eb63209 commit 4d58d36
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 27 deletions.
2 changes: 1 addition & 1 deletion nimlite.nimble
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Package

version = "0.3.0"
version = "0.3.1"
author = "Ratchet"
description = "Utilities for tablite to work with nim"
license = "MIT"
Expand Down
1 change: 1 addition & 0 deletions nimlite/funcs/filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ proc filter*(table: nimpy.PyObject, pyExpressions: seq[nimpy.PyObject], filterTy
let pyType = builtins.getTypeName(pyVal)
let obj: PY_ObjectND = (
case pyType
of "NoneType": PY_None
of "int": newPY_Object(pyVal.to(int))
of "float": newPY_Object(pyVal.to(float))
of "bool": newPY_Object(pyVal.to(bool))
Expand Down
79 changes: 57 additions & 22 deletions nimlite/numpy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ type NDArrayTypeDescriptor = enum
D_BOOLEAN
D_INT
D_FLOAT
D_TIME
D_DATE_DAYS
D_TIME_SECONDS
D_TIME_MILISECONDS
D_TIME_MICROSECONDS
D_DATETIME_SECONDS
D_DATETIME_MILISECONDS
D_DATETIME_MICROSECONDS
Expand Down Expand Up @@ -523,16 +525,20 @@ proc consumeDescr(header: var string, header_len: int, offset: var int): NDArray
descriptor = NDArrayTypeDescriptor.D_OBJECT
of 'm':
case dt_descriptor:
of "us": NDArrayTypeDescriptor.D_TIME_MICROSECONDS
of "ms": NDArrayTypeDescriptor.D_TIME_MILISECONDS
of "s": NDArrayTypeDescriptor.D_TIME_SECONDS
else: implement(descr)
of 'M':
case dt_descriptor:
of "D":
size = 8
descriptor = NDArrayTypeDescriptor.D_DATE_DAYS
of "us":
size = 8
descriptor = NDArrayTypeDescriptor.D_DATETIME_MICROSECONDS
else: implement(descr)
size = 8
descriptor = (
case dt_descriptor:
of "D": NDArrayTypeDescriptor.D_DATE_DAYS
of "us": NDArrayTypeDescriptor.D_DATETIME_MICROSECONDS
of "ms": NDArrayTypeDescriptor.D_DATETIME_MILISECONDS
of "s": NDArrayTypeDescriptor.D_DATETIME_SECONDS
else: implement(descr)
)
else:
size = parseInt(descr[type_offset+1..descr.len-1])

Expand Down Expand Up @@ -659,6 +665,33 @@ proc newDateTimeArray_Microseconds(fh: var File, endianness: Endianness, shape:

return DateTimeNDArray(buf: buf, shape: shape)

proc newTimeArray_Seconds(fh: var File, endianness: Endianness, shape: var Shape): ObjectNDArray {.inline.} =
let data = readPrimitiveBuffer[int64](fh, shape)
let dtypes = {K_TIME: data.len}.toTable
let buf = collect:
for v in data:
newPY_Object(seconds2Duration(float v))

return ObjectNDArray(buf: buf, shape: shape, dtypes: dtypes)

proc newTimeArray_Miliseconds(fh: var File, endianness: Endianness, shape: var Shape): ObjectNDArray {.inline.} =
let data = readPrimitiveBuffer[int64](fh, shape)
let dtypes = {K_TIME: data.len}.toTable
let buf = collect:
for v in data:
newPY_Object(seconds2Duration(float v * 1_000))

return ObjectNDArray(buf: buf, shape: shape, dtypes: dtypes)

proc newTimeArray_Microseconds(fh: var File, endianness: Endianness, shape: var Shape): ObjectNDArray {.inline.} =
let data = readPrimitiveBuffer[int64](fh, shape)
let dtypes = {K_TIME: data.len}.toTable
let buf = collect:
for v in data:
newPY_Object(seconds2Duration(float v * 1_000_000))

return ObjectNDArray(buf: buf, shape: shape, dtypes: dtypes)

template newFloatNDArray(fh: var File, endianness: Endianness, size: int, shape: var Shape) =
case size:
of 4: Float32NDArray(buf: readPrimitiveBuffer[float32](fh, shape), shape: shape)
Expand Down Expand Up @@ -711,20 +744,22 @@ proc readPageInfo(fh: var File): (NDArrayDescriptor, bool, Shape) =

proc readNumpy(fh: var File): BaseNDArray =
var ((descrEndianness, descrType, descrSize), _, shape) = readPageInfo(fh)
var page: BaseNDArray

case descrType:
of D_BOOLEAN: page = newBooleanNDArray(fh, shape)
of D_INT: page = newIntNDArray(fh, descrEndianness, descrSize, shape)
of D_FLOAT: page = newFloatNDArray(fh, descrEndianness, descrSize, shape)
of D_UNICODE: page = newUnicodeNDArray(fh, descrEndianness, descrSize, shape)
of D_OBJECT: page = newObjectNDArray(fh, descrEndianness, shape)
of D_DATE_DAYS: page = newDateArray_Days(fh, descrEndianness, shape)
of D_DATETIME_SECONDS: page = newDateTimeArray_Seconds(fh, descrEndianness, shape)
of D_DATETIME_MILISECONDS: page = newDateTimeArray_Miliseconds(fh, descrEndianness, shape)
of D_DATETIME_MICROSECONDS: page = newDateTimeArray_Microseconds(fh, descrEndianness, shape)
else: implement($descrType)

let page = (
case descrType:
of D_BOOLEAN: newBooleanNDArray(fh, shape)
of D_INT: newIntNDArray(fh, descrEndianness, descrSize, shape)
of D_FLOAT: newFloatNDArray(fh, descrEndianness, descrSize, shape)
of D_UNICODE: newUnicodeNDArray(fh, descrEndianness, descrSize, shape)
of D_OBJECT: newObjectNDArray(fh, descrEndianness, shape)
of D_DATE_DAYS: newDateArray_Days(fh, descrEndianness, shape)
of D_DATETIME_SECONDS: newDateTimeArray_Seconds(fh, descrEndianness, shape)
of D_DATETIME_MILISECONDS: newDateTimeArray_Miliseconds(fh, descrEndianness, shape)
of D_DATETIME_MICROSECONDS: newDateTimeArray_Microseconds(fh, descrEndianness, shape)
of D_TIME_SECONDS: newTimeArray_Seconds(fh, descrEndianness, shape)
of D_TIME_MILISECONDS: newTimeArray_Miliseconds(fh, descrEndianness, shape)
of D_TIME_MICROSECONDS: newTimeArray_Microseconds(fh, descrEndianness, shape)
)
return page

proc readNumpy*(path: string): BaseNDArray =
Expand Down
181 changes: 178 additions & 3 deletions tablite/redux.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from tablite.base import BaseTable
import numpy as np
import warnings
from tablite.utils import sub_cls_check, type_check, expression_interpreter
from tablite.mp_utils import filter_ops
from tablite.datatypes import list_to_np_array
from tablite.config import Config
from tablite.nimlite import filter as _filter_using_list_of_dicts
from tablite.nimlite import filter as _filter_using_list_of_dicts_native
from tqdm import tqdm as _tqdm


Expand Down Expand Up @@ -163,10 +164,184 @@ def _compress_both(T, mask, pbar: _tqdm):
pbar.update(pbar_step)
return true, false

def _filter_using_list_of_dicts(T, expressions, filter_type, pbar: _tqdm):
"""
enables filtering across columns for multiple criteria.
expressions:
str: Expression that can be compiled and executed row by row.
exampLe: "all((A==B and C!=4 and 200<D))"
list of dicts: (example):
L = [
{'column1':'A', 'criteria': "==", 'column2': 'B'},
{'column1':'C', 'criteria': "!=", "value2": '4'},
{'value1': 200, 'criteria': "<", column2: 'D' }
]
accepted dictionary keys: 'column1', 'column2', 'criteria', 'value1', 'value2'
filter_type: 'all' or 'any'
"""
for expression in expressions:
if not isinstance(expression, dict):
raise TypeError(f"invalid expression: {expression}")
if not len(expression) == 3:
raise ValueError(f"expected 3 items, got {expression}")
x = {"column1", "column2", "criteria", "value1", "value2"}
if not set(expression.keys()).issubset(x):
raise ValueError(f"got unknown key: {set(expression.keys()).difference(x)}")

if expression["criteria"] not in filter_ops:
raise ValueError(f"criteria missing from {expression}")

c1 = expression.get("column1", None)
if c1 is not None and c1 not in T.columns:
raise ValueError(f"no such column: {c1}")

v1 = expression.get("value1", None)
if v1 is not None and c1 is not None:
raise ValueError("filter can only take 1 left expr element. Got 2.")

c2 = expression.get("column2", None)
if c2 is not None and c2 not in T.columns:
raise ValueError(f"no such column: {c2}")

v2 = expression.get("value2", None)
if v2 is not None and c2 is not None:
raise ValueError("filter can only take 1 right expression element. Got 2.")

if not isinstance(filter_type, str):
raise TypeError()
if filter_type not in {"all", "any"}:
raise ValueError(f"filter_type: {filter_type} not in ['all', 'any']")

# EVALUATION....
# 1. setup a rectangular bitmap for evaluations
bitmap = np.empty(shape=(len(expressions), len(T)), dtype=bool)
pbar_div = (len(expressions) * len(list(Config.page_steps(len(T)))) - 1)
pbar_step = (10 / pbar_div) if pbar_div != 0 else 0
# 2. create tasks for evaluations
for bit_index, expression in enumerate(expressions):
assert isinstance(expression, dict)
assert len(expression) == 3
c1 = expression.get("column1", None)
c2 = expression.get("column2", None)
expr = expression.get("criteria", None)
assert expr in filter_ops
v1 = expression.get("value1", None)
v2 = expression.get("value2", None)

for start, end in Config.page_steps(len(T)):
if c1 is not None:
dset_A = T[c1][start:end]
else: # v1 is active:
dset_A = np.array([v1] * (end - start))

if c2 is not None:
dset_B = T[c2][start:end]
else: # v2 is active:
dset_B = np.array([v2] * (end - start))

if len(dset_A) != len(dset_B):
raise ValueError(
f"Assymmetric dataset: {c1} has {len(dset_A)} values, whilst {c2} has {len(dset_B)} values."
)
# Evaluate
try:
if expr == ">":
result = dset_A > dset_B
elif expr == ">=":
result = dset_A >= dset_B
elif expr == "==":
result = dset_A == dset_B
elif expr == "<":
result = dset_A < dset_B
elif expr == "<=":
result = dset_A <= dset_B
elif expr == "!=":
result = dset_A != dset_B
else: # it's a python evaluations (slow)
f = filter_ops.get(expr)
assert callable(f)
result = list_to_np_array([f(a, b) for a, b in zip(dset_A, dset_B)])
except TypeError:
def safe_test(f, a, b):
try:
return f(a, b)
except TypeError:
return False
f = filter_ops.get(expr)
assert callable(f)
result = list_to_np_array([safe_test(f, a, b) for a, b in zip(dset_A, dset_B)])
bitmap[bit_index, start:end] = result
pbar.update(pbar_step)

f = np.all if filter_type == "all" else np.any
mask = f(bitmap, axis=0)
# 4. The mask is now created and is no longer needed.
pbar.update(10 - pbar.n)
return mask

def filter_non_primitive(T, expressions, filter_type="all", tqdm=_tqdm):
"""
OBSOLETE
filters table
Args:
T (Table subclass): Table.
expressions (list or str):
str:
filters based on an expression, such as:
"all((A==B, C!=4, 200<D))"
which is interpreted using python's compiler to:
def _f(A,B,C,D):
return all((A==B, C!=4, 200<D))
list of dicts: (example):
L = [
{'column1':'A', 'criteria': "==", 'column2': 'B'},
{'column1':'C', 'criteria': "!=", "value2": '4'},
{'value1': 200, 'criteria': "<", column2: 'D' }
]
accepted dictionary keys: 'column1', 'column2', 'criteria', 'value1', 'value2'
filter_type (str, optional): Ignored if expressions is str.
'all' or 'any'. Defaults to "all".
tqdm (tqdm, optional): progressbar. Defaults to _tqdm.
Returns:
2xTables: trues, falses
"""
# determine method
warnings.warn("Filter using non-primitive types is not recommended.")
sub_cls_check(T, BaseTable)
if len(T) == 0:
return T.copy(), T.copy()

with tqdm(desc="filter", total=20) as pbar:
if isinstance(expressions, str):
mask = _filter_using_expression(T, expressions)
pbar.update(10)
elif isinstance(expressions, list):
mask = _filter_using_list_of_dicts(T, expressions, filter_type, pbar)
else:
raise TypeError
# create new tables
res = _compress_both(T, mask, pbar=pbar)
pbar.update(pbar.total - pbar.n)

return res

def filter(T, expressions, filter_type="all", tqdm=_tqdm):
"""filters table
Note: At the moment only tablite primitive types are supported
Args:
T (Table subclass): Table.
Expand Down Expand Up @@ -209,7 +384,7 @@ def _f(A,B,C,D):
res = _compress_both(T, mask, pbar=pbar)
pbar.update(pbar.total - pbar.n)
elif isinstance(expressions, list):
return _filter_using_list_of_dicts(T, expressions, filter_type, tqdm)
return _filter_using_list_of_dicts_native(T, expressions, filter_type, tqdm)
else:
raise TypeError
# create new tables
Expand Down
2 changes: 1 addition & 1 deletion tablite/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
major, minor, patch = 2023, 11, 0
major, minor, patch = 2023, 11, 1
__version_info__ = (major, minor, patch)
__version__ = ".".join(str(i) for i in __version_info__)

0 comments on commit 4d58d36

Please sign in to comment.