Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap rolling sum updateBy operation #3399

Merged
merged 4 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 88 additions & 4 deletions py/server/deephaven/updateby.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ def ema_tick_decay(time_scale_ticks: int, cols: Union[str, List[str]],

Args:
time_scale_ticks (int): the decay rate in ticks
cols (Union[str, List[str]]): the column(s) to be operated on, can be renaming expressions, i.e. "new_col = col"
cols (Union[str, List[str]]): the column(s) to be operated on, can include expressions to rename the output,
i.e. "new_col = col"; when empty, update_by perform the ema operation on all columns.
op_control (OperationControl): defines how special cases should behave, when None, the default OperationControl
settings as specified in :meth:`~OperationControl.__init__` will be used

Expand Down Expand Up @@ -140,7 +141,8 @@ def ema_time_decay(ts_col: str, time_scale: Union[int, str], cols: Union[str, Li

time_scale (Union[int, str]): the decay rate, can be expressed as an integer in nanoseconds or a time
interval string, e.g. "00:00:00.001"
cols (Union[str, List[str]]): the column(s) to be operated on, can be renaming expressions, i.e. "new_col = col"
cols (Union[str, List[str]]): the column(s) to be operated on, can include expressions to rename the output,
i.e. "new_col = col"; when empty, update_by perform the ema operation on all columns.
op_control (OperationControl): defines how special cases should behave, when None, the default OperationControl
settings as specified in :meth:`~OperationControl.__init__` will be used

Expand All @@ -151,8 +153,7 @@ def ema_time_decay(ts_col: str, time_scale: Union[int, str], cols: Union[str, Li
DHError
"""
try:
if isinstance(time_scale, str):
time_scale = _JDateTimeUtils.expressionToNanos(time_scale)
time_scale = _JDateTimeUtils.expressionToNanos(time_scale) if isinstance(time_scale, str) else time_scale
cols = to_sequence(cols)
if op_control is None:
return UpdateByOperation(j_updateby_op=_JUpdateByOperation.Ema(ts_col, time_scale, *cols))
Expand Down Expand Up @@ -267,3 +268,86 @@ def forward_fill(cols: Union[str, List[str]]) -> UpdateByOperation:
return UpdateByOperation(j_updateby_op=_JUpdateByOperation.Fill(cols))
except Exception as e:
raise DHError(e, "failed to create a forward fill UpdateByOperation.") from e


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing this made me wonder if we made an API mistake. For the rolling sum, ema, etc, we have separate methods for different flavors (e.g. ema_tick_decay and ema_time_decay). It seems like it might be more pythonic to have ema and rolling_sum, where the flavor is determined by the inputs. I'm not sure how strong my thoughts are, so this is more of a discussion item.
@jmao-denver @jjbrosnan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see the benefits of folding them into one function, but I prefer to have separate functions (especially when the parameter sets are quite different), as it is more explicit, cleaner, less chance for users to make mistake as far as arguments are concerned, also easier for us to document/maintain the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind the explicitness of the ticks/time signatures but I have near-zero python experience in a professional role.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the parameter sets need to be that different. If these were combined into one function, you could have the first argument ts_col be optional with a default value of None. The rest of the arguments could look like they do in the time-based rolling sum functions, where each argument must be an int or string. In a tick case, you give ints. In the time case, if you give an int, it assumes seconds, and if you give a string, it can be whatever.

Then, the try/except block at the end can be separated into two parts, which depend on whether or not a timestamp column is given.

I think the benefits of combining each into one function outweigh the drawbacks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially, for the rolling sum, it would be (without comments):

def rolling_sum(ts_col: str = None, cols: Union[str, List[str]], rev_time: Union[int, str],
                     fwd_time: Union[int, str] = 0) -> UpdateByOperation:
    if ts_col:
        try:
            cols = to_sequence(cols)
            rev_time = _JDateTimeUtils.expressionToNanos(rev_time) if isinstance(rev_time, str) else rev_time
            fwd_time = _JDateTimeUtils.expressionToNanos(fwd_time) if isinstance(fwd_time, str) else fwd_time
            return UpdateByOperation(j_updateby_op=_JUpdateByOperation.RollingSum(ts_col, rev_time, fwd_time, *cols))
        except Exception as e:
            raise DHError(e, "failed to create a rolling sum (time) UpdateByOperation.") from e
    else:
        try:
            cols = to_sequence(cols)
            return UpdateByOperation(j_updateby_op=_JUpdateByOperation.RollingSum(rev_ticks, fwd_ticks, *cols))
        except Exception as e:
            raise DHError(e, "failed to create a rolling sum (tick) UpdateByOperation.") from e

Copy link
Contributor

@lbooker42 lbooker42 Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jjbrosnan some extra testing in the ticks case will be needed. Time strings need to be rejected as they make no sense for row-based computations. You will get an exception from the call, so as long as the error is clear this looks good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is doable (we'll need to add checks for invalid argument combinations) but to me the extra cost and the potential cognitive burden on a user to understand the difference outweighs the gains.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also having ts_col as optional pretty much forces the users to use keyword arguments in the tick case, which may not be a bad thing but then it is not consistent with the time case.

def rolling_sum_tick(cols: Union[str, List[str]], rev_ticks: int, fwd_ticks: int = 0) -> UpdateByOperation:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmao-denver thoughts on the argument ordering vs other methods in this module? e.g. did we do ema_tick_decay in the wrong order?

Copy link
Contributor Author

@jmao-denver jmao-denver Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter ordering of ema_tick_decay/ema_time_decay matches the Java ones more closely, which is nice. But it is not something we can do with rolling sums. I have no objection to making them consistent if you feel strongly about it. Even though it is a breaking change, I doubt we'll affect users so much as our own docs/examples.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also would prefer a closer match with Java/Groovy for rolling_sum, but I think the argument is about optional parameter(s). If we want to harmonize, it makes as much sense to me to update the Java signatures and move these optional parameters to the end of the signature.

"""Creates a rolling sum UpdateByOperation for the supplied column names, using ticks as the windowing unit. Ticks
are row counts, and you may specify the reverse and forward window in number of rows to include. The current row
is considered to belong to the reverse window but not the forward window. Also, negative values are allowed and
can be used to generate completely forward or completely reverse windows.

Here are some examples of window values:
rev_ticks = 1, fwd_ticks = 0 - contains only the current row
rev_ticks = 10, fwd_ticks = 0 - contains 9 previous rows and the current row
rev_ticks = 0, fwd_ticks = 10 - contains the following 10 rows, excludes the current row
rev_ticks = 10, fwd_ticks = 10 - contains the previous 9 rows, the current row and the 10 rows following
rev_ticks = 10, fwd_ticks = -5 - contains 5 rows, beginning at 9 rows before, ending at 5 rows before the
current row (inclusive)
rev_ticks = 11, fwd_ticks = -1 - contains 10 rows, beginning at 10 rows before, ending at 1 row before the
current row (inclusive)
rev_ticks = -5, fwd_ticks = 10 - contains 5 rows, beginning 5 rows following, ending at 10 rows following the
current row (inclusive)


Args:
cols (Union[str, List[str]]): the column(s) to be operated on, can include expressions to rename the output,
i.e. "new_col = col"; when empty, update_by perform the rolling sum operation on all columns.
rev_ticks (int): the look-behind window size (in rows/ticks)
fwd_ticks (int): the look-forward window size (int rows/ticks), default is 0

Returns:
an UpdateByOperation

Raises:
DHError
"""
try:
cols = to_sequence(cols)
return UpdateByOperation(j_updateby_op=_JUpdateByOperation.RollingSum(rev_ticks, fwd_ticks, *cols))
except Exception as e:
raise DHError(e, "failed to create a rolling sum (tick) UpdateByOperation.") from e


def rolling_sum_time(ts_col: str, cols: Union[str, List[str]], rev_time: Union[int, str],
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
fwd_time: Union[int, str] = 0) -> UpdateByOperation:
"""Creates a rolling sum UpdateByOperation for the supplied column names, using time as the windowing unit. This
function accepts nanoseconds or time strings as the reverse and forward window parameters. Negative values are
allowed and can be used to generate completely forward or completely reverse windows. A row containing a null in
the timestamp column belongs to no window and will not have a value computed or be considered in the windows of
other rows.

Here are some examples of window values:
rev_time = 0, fwd_time = 0 - contains rows that exactly match the current row timestamp
rev_time = "00:10:00", fwd_time = "0" - contains rows from 10m before through the current row timestamp (
inclusive)
rev_time = 0, fwd_time = 600_000_000_000 - contains rows from the current row through 10m following the
current row timestamp (inclusive)
rev_time = "00:10:00", fwd_time = "00:10:00" - contains rows from 10m before through 10m following
the current row timestamp (inclusive)
rev_time = "00:10:00", fwd_time = "-00:05:00" - contains rows from 10m before through 5m before the
current row timestamp (inclusive), this is a purely backwards looking window
rev_time = "-00:05:00", fwd_time = "00:10:00"} - contains rows from 5m following through 10m
following the current row timestamp (inclusive), this is a purely forwards looking window

Args:
ts_col (str):
cols (Union[str, List[str]]): the column(s) to be operated on, can include expressions to rename the output,
i.e. "new_col = col"; when empty, update_by perform the rolling sum operation on all columns.
rev_time (int): the look-behind window size, can be expressed as an integer in nanoseconds or a time
interval string, e.g. "00:00:00.001"
fwd_time (int): the look-ahead window size, can be expressed as an integer in nanoseconds or a time
interval string, e.g. "00:00:00.001", default is 0

Returns:
an UpdateByOperation

Raises:
DHError
"""
try:
cols = to_sequence(cols)
rev_time = _JDateTimeUtils.expressionToNanos(rev_time) if isinstance(rev_time, str) else rev_time
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
fwd_time = _JDateTimeUtils.expressionToNanos(fwd_time) if isinstance(fwd_time, str) else fwd_time
return UpdateByOperation(j_updateby_op=_JUpdateByOperation.RollingSum(ts_col, rev_time, fwd_time, *cols))
except Exception as e:
raise DHError(e, "failed to create a rolling sum (time) UpdateByOperation.") from e
2 changes: 1 addition & 1 deletion py/server/tests/test_ugp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def setUp(self) -> None:
ugp.auto_locking = False

def tearDown(self):
ugp.auto_locking = False
ugp.auto_locking = True
super().tearDown()

def test_ugp_context_manager(self):
Expand Down
123 changes: 77 additions & 46 deletions py/server/tests/test_updateby.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from deephaven import read_csv, time_table, ugp
from deephaven.updateby import ema_tick_decay, BadDataBehavior, MathContext, OperationControl, ema_time_decay, cum_sum, \
cum_prod, cum_min, cum_max, forward_fill
cum_prod, cum_min, cum_max, forward_fill, rolling_sum_tick, rolling_sum_time
from tests.testbase import BaseTestCase


Expand All @@ -28,22 +28,21 @@ def test_ema(self):
on_nan=BadDataBehavior.RESET,
big_value_context=MathContext.UNLIMITED)

ema_ops = [ema_tick_decay(time_scale_ticks=100, cols="ema_a = a"),
ema_tick_decay(time_scale_ticks=100, cols="ema_a = a", op_control=op_ctrl),
ema_time_decay(ts_col="Timestamp", time_scale=10, cols="ema_a = a"),
ema_time_decay(ts_col="Timestamp", time_scale="00:00:00.001", cols="ema_c = c",
op_control=op_ctrl),
]
ops = [ema_tick_decay(time_scale_ticks=100, cols="ema_a = a"),
ema_tick_decay(time_scale_ticks=100, cols="ema_a = a", op_control=op_ctrl),
ema_time_decay(ts_col="Timestamp", time_scale=10, cols="ema_a = a"),
ema_time_decay(ts_col="Timestamp", time_scale="00:00:00.001", cols="ema_c = c",
op_control=op_ctrl),
]

for ema_op in ema_ops:
with self.subTest(ema_op):
for op in ops:
with self.subTest(op):
for t in (self.static_table, self.ticking_table):
rt = t.update_by(ops=op, by="b")
self.assertTrue(rt.is_refreshing is t.is_refreshing)
self.assertEqual(len(rt.columns), 1 + len(t.columns))
with ugp.exclusive_lock():
ema_table = t.update_by(ops=ema_op, by="b")
self.assertTrue(ema_table.is_refreshing is t.is_refreshing)
self.assertEqual(len(ema_table.columns), 1 + len(t.columns))
with ugp.exclusive_lock():
self.assertEqual(ema_table.size, t.size)
self.assertEqual(rt.size, t.size)

def test_simple_ops(self):
op_builders = [cum_sum, cum_prod, cum_min, cum_max, forward_fill]
Expand All @@ -52,11 +51,11 @@ def test_simple_ops(self):
for op_builder in op_builders:
with self.subTest(op_builder):
for t in (self.static_table, self.ticking_table):
rt = t.update_by(ops=op_builder(pairs), by="e")
self.assertTrue(rt.is_refreshing is t.is_refreshing)
self.assertEqual(len(rt.columns), 2 + len(t.columns))
with ugp.exclusive_lock():
updateby_table = t.update_by(ops=op_builder(pairs), by="e")
self.assertTrue(updateby_table.is_refreshing is t.is_refreshing)
self.assertEqual(len(updateby_table.columns), 2 + len(t.columns))
self.assertEqual(updateby_table.size, t.size)
self.assertEqual(rt.size, t.size)

def test_simple_ops_proxy(self):
op_builders = [cum_sum, cum_prod, cum_min, cum_max, forward_fill]
Expand All @@ -68,51 +67,83 @@ def test_simple_ops_proxy(self):
for op_builder in op_builders:
with self.subTest(op_builder):
for pt_proxy in pt_proxies:
if pt_proxy.is_refreshing:
ugp.auto_locking = True
updateby_proxy = pt_proxy.update_by(ops=op_builder(pairs), by="e")
rt_proxy = pt_proxy.update_by(ops=op_builder(pairs), by="e")

self.assertTrue(updateby_proxy.is_refreshing is pt_proxy.is_refreshing)
self.assertEqual(len(updateby_proxy.target.constituent_table_columns),
self.assertTrue(rt_proxy.is_refreshing is pt_proxy.is_refreshing)
self.assertEqual(len(rt_proxy.target.constituent_table_columns),
2 + len(pt_proxy.target.constituent_table_columns))

for ct, ub_ct in zip(pt_proxy.target.constituent_tables, updateby_proxy.target.constituent_tables):
for ct, rct in zip(pt_proxy.target.constituent_tables, rt_proxy.target.constituent_tables):
with ugp.exclusive_lock():
self.assertEqual(ct.size, ub_ct.size)

if pt_proxy.is_refreshing:
ugp.auto_locking = False
self.assertEqual(ct.size, rct.size)

def test_ema_proxy(self):
op_ctrl = OperationControl(on_null=BadDataBehavior.THROW,
on_nan=BadDataBehavior.RESET,
big_value_context=MathContext.UNLIMITED)

ema_ops = [ema_tick_decay(time_scale_ticks=100, cols="ema_a = a"),
ema_tick_decay(time_scale_ticks=100, cols="ema_a = a", op_control=op_ctrl),
ema_time_decay(ts_col="Timestamp", time_scale=10, cols="ema_a = a"),
ema_time_decay(ts_col="Timestamp", time_scale=100, cols="ema_c = c",
op_control=op_ctrl),
]
ops = [ema_tick_decay(time_scale_ticks=100, cols="ema_a = a"),
ema_tick_decay(time_scale_ticks=100, cols="ema_a = a", op_control=op_ctrl),
ema_time_decay(ts_col="Timestamp", time_scale=10, cols="ema_a = a"),
ema_time_decay(ts_col="Timestamp", time_scale=100, cols="ema_c = c",
op_control=op_ctrl),
]
pt_proxies = [self.static_table.partition_by("b").proxy(),
self.ticking_table.partition_by("b").proxy(),
]

for ema_op in ema_ops:
with self.subTest(ema_op):
for op in ops:
with self.subTest(op):
for pt_proxy in pt_proxies:
if pt_proxy.is_refreshing:
ugp.auto_locking = True

ema_proxy = pt_proxy.update_by(ema_op, by="e")
for ct, ema_ct in zip(pt_proxy.target.constituent_tables, ema_proxy.target.constituent_tables):
self.assertTrue(ema_ct.is_refreshing is ct.is_refreshing)
self.assertEqual(len(ema_ct.columns), 1 + len(ct.columns))
rt_proxy = pt_proxy.update_by(op, by="e")
for ct, rct in zip(pt_proxy.target.constituent_tables, rt_proxy.target.constituent_tables):
self.assertTrue(rct.is_refreshing is ct.is_refreshing)
self.assertEqual(len(rct.columns), 1 + len(ct.columns))
with ugp.exclusive_lock():
self.assertEqual(ct.size, ema_ct.size)
self.assertEqual(ct.size, rct.size)

def test_rolling_sum(self):
ops = [
rolling_sum_tick(cols=["rsum_a = a", "rsum_d = d"], rev_ticks=10),
rolling_sum_tick(cols=["rsum_a = a", "rsum_d = d"], rev_ticks=10, fwd_ticks=10),
rolling_sum_time(ts_col="Timestamp", cols=["rsum_b = b", "rsum_e = e"], rev_time="00:00:10"),
rolling_sum_time(ts_col="Timestamp", cols=["rsum_b = b", "rsum_e = e"], rev_time=10_000_000_000,
fwd_time=-10_000_000_00),
rolling_sum_time(ts_col="Timestamp", cols=["rsum_b = b", "rsum_e = e"], rev_time="00:00:30",
fwd_time="-00:00:20"),
]

for op in ops:
with self.subTest(op):
for t in (self.static_table, self.ticking_table):
rt = t.update_by(ops=op, by="c")
self.assertTrue(rt.is_refreshing is t.is_refreshing)
self.assertEqual(len(rt.columns), 2 + len(t.columns))
with ugp.exclusive_lock():
self.assertEqual(rt.size, t.size)

def test_rolling_sum_proxy(self):
ops = [
rolling_sum_tick(cols=["rsum_a = a", "rsum_d = d"], rev_ticks=10),
rolling_sum_tick(cols=["rsum_a = a", "rsum_d = d"], rev_ticks=10, fwd_ticks=10),
rolling_sum_time(ts_col="Timestamp", cols=["rsum_b = b", "rsum_e = e"], rev_time="00:00:10"),
rolling_sum_time(ts_col="Timestamp", cols=["rsum_b = b", "rsum_e = e"], rev_time="00:00:10",
fwd_time=-10_000_000_00),
]

if pt_proxy.is_refreshing:
ugp.auto_locking = True
pt_proxies = [self.static_table.partition_by("b").proxy(),
self.ticking_table.partition_by("b").proxy(),
]

for op in ops:
with self.subTest(op):
for pt_proxy in pt_proxies:
rt_proxy = pt_proxy.update_by(op, by="c")
for ct, rct in zip(pt_proxy.target.constituent_tables, rt_proxy.target.constituent_tables):
self.assertTrue(rct.is_refreshing is ct.is_refreshing)
self.assertEqual(len(rct.columns), 2 + len(ct.columns))
with ugp.exclusive_lock():
self.assertEqual(ct.size, rct.size)


if __name__ == '__main__':
Expand Down