From dae9d391f24d2b421714b826e8d8be22b21c6d51 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 2 Feb 2023 11:02:20 -0700 Subject: [PATCH 1/4] Wrap rolling sum updateBy operation --- py/server/deephaven/updateby.py | 85 +++++++++++++++++++++- py/server/tests/test_ugp.py | 2 +- py/server/tests/test_updateby.py | 121 +++++++++++++++++++------------ 3 files changed, 159 insertions(+), 49 deletions(-) diff --git a/py/server/deephaven/updateby.py b/py/server/deephaven/updateby.py index c6d46c694df..1d74d517f05 100644 --- a/py/server/deephaven/updateby.py +++ b/py/server/deephaven/updateby.py @@ -151,8 +151,7 @@ def ema_time_decay(ts_col: str, time_scale: Union[int, str], cols: Union[str, Li DHError """ try: - if isinstance(time_scale, str): - time_scale = _JDateTimeUtils.expressionToNanos(time_scale) + time_scale = _JDateTimeUtils.expressionToNanos(time_scale) if isinstance(time_scale, str) else time_scale cols = to_sequence(cols) if op_control is None: return UpdateByOperation(j_updateby_op=_JUpdateByOperation.Ema(ts_col, time_scale, *cols)) @@ -267,3 +266,85 @@ def forward_fill(cols: Union[str, List[str]]) -> UpdateByOperation: return UpdateByOperation(j_updateby_op=_JUpdateByOperation.Fill(cols)) except Exception as e: raise DHError(e, "failed to create a forward fill UpdateByOperation.") from e + + +def rolling_sum_tick(cols: Union[str, List[str]], rev_ticks: int, fwd_ticks: int = 0) -> UpdateByOperation: + """Creates a rolling sum UpdateByOperation for the supplied column names, using ticks as the windowing unit. Ticks + are row counts and you may specify the reverse and forward window in number of rows to include. The current row + is considered to belong to the reverse window but not the forward window. Also, negative values are allowed and + can be used to generate completely forward or completely reverse windows. + + Here are some examples of window values: + rev_ticks = 1, fwd_ticks = 0 - contains only the current row + rev_ticks = 10, fwd_ticks = 0 - contains 9 previous rows and the current row + rev_ticks = 0, fwd_ticks = 10 - contains the following 10 rows, excludes the current row + rev_ticks = 10, fwd_ticks = 10 - contains the previous 9 rows, the current row and the 10 rows following + rev_ticks = 10, fwd_ticks = -5 - contains 5 rows, beginning at 9 rows before, ending at 5 rows before the + current row (inclusive) + rev_ticks = 11, fwd_ticks = -1 - contains 10 rows, beginning at 10 rows before, ending at 1 row before the + current row (inclusive) + rev_ticks = -5, fwd_ticks = 10 - contains 5 rows, beginning 5 rows following, ending at 10 rows following the + current row (inclusive) + + + Args: + cols (Union[str, List[str]]): the column(s) to be operated on, can include expressions to rename the output, + i.e. "new_col = col"; when empty, update_by perform the forward fill operation on all columns. + rev_ticks (int): the look-behind window size (in rows/ticks) + fwd_ticks (int): the look-forward window size (int rows/ticks), default is 0 + + Returns: + an UpdateByOperation + + Raises: + DHError + """ + try: + cols = to_sequence(cols) + return UpdateByOperation(j_updateby_op=_JUpdateByOperation.RollingSum(rev_ticks, fwd_ticks, *cols)) + except Exception as e: + raise DHError(e, "failed to create a rolling sum (tick) UpdateByOperation.") from e + + +def rolling_sum_time(ts_col: str, cols: Union[str, List[str]], rev_time: Union[int, str], + fwd_time: Union[int, str] = 0) -> UpdateByOperation: + """Creates a rolling sum UpdateByOperation for the supplied column names, using time as the windowing unit. This + function accepts nanoseconds as the reverse and forward window parameters. Negative values are allowed and can be + used to generate completely forward or completely reverse windows. A row containing a null in the timestamp + column belongs to no window and will not have a value computed or be considered in the windows of other rows. + + Here are some examples of window values: + rev_time = 0, fwd_time = 0 - contains rows that exactly match the current row timestamp + rev_time = 600_000_000_000, fwd_time = 0 - contains rows from 10m earlier through the current row timestamp ( + inclusive) + rev_time = 0, fwd_time = 600_000_000_000 - contains rows from the current row through 10m following the + current row timestamp (inclusive) + rev_time = 600_000_000_000, fwd_time = 600_000_000_000 - contains rows from 10m earlier through 10m following + the current row timestamp (inclusive) + rev_time = 600_000_000_000, fwd_time = -30_000_000_000 - contains rows from 10m earlier through 5m before the + current * row timestamp (inclusive), this is a purely backwards looking window + rev_time = -300_000_000_000, fwd_time = 600_000_000_000} - contains rows from 5m following through 10m + following the current row timestamp (inclusive), this is a purely forwards looking window + + Args: + ts_col (str): + cols (Union[str, List[str]]): the column(s) to be operated on, can include expressions to rename the output, + i.e. "new_col = col"; when empty, update_by perform the forward fill operation on all columns. + rev_time (int): the look-behind window size, can be expressed as an integer in nanoseconds or a time + interval string, e.g. "00:00:00.001" + fwd_time (int): the look-ahead window size, can be expressed as an integer in nanoseconds or a time + interval string, e.g. "00:00:00.001", default is 0 + + Returns: + an UpdateByOperation + + Raises: + DHError + """ + try: + cols = to_sequence(cols) + rev_time = _JDateTimeUtils.expressionToNanos(rev_time) if isinstance(rev_time, str) else rev_time + fwd_time = _JDateTimeUtils.expressionToNanos(fwd_time) if isinstance(fwd_time, str) else fwd_time + return UpdateByOperation(j_updateby_op=_JUpdateByOperation.RollingSum(ts_col, rev_time, fwd_time, *cols)) + except Exception as e: + raise DHError(e, "failed to create a rolling sum (time) UpdateByOperation.") from e diff --git a/py/server/tests/test_ugp.py b/py/server/tests/test_ugp.py index 4b2897bca50..176bbbb0586 100644 --- a/py/server/tests/test_ugp.py +++ b/py/server/tests/test_ugp.py @@ -26,7 +26,7 @@ def setUp(self) -> None: ugp.auto_locking = False def tearDown(self): - ugp.auto_locking = False + ugp.auto_locking = True super().tearDown() def test_ugp_context_manager(self): diff --git a/py/server/tests/test_updateby.py b/py/server/tests/test_updateby.py index 58c1f15a1f3..d232547e3ce 100644 --- a/py/server/tests/test_updateby.py +++ b/py/server/tests/test_updateby.py @@ -6,7 +6,7 @@ from deephaven import read_csv, time_table, ugp from deephaven.updateby import ema_tick_decay, BadDataBehavior, MathContext, OperationControl, ema_time_decay, cum_sum, \ - cum_prod, cum_min, cum_max, forward_fill + cum_prod, cum_min, cum_max, forward_fill, rolling_sum_tick, rolling_sum_time from tests.testbase import BaseTestCase @@ -28,22 +28,21 @@ def test_ema(self): on_nan=BadDataBehavior.RESET, big_value_context=MathContext.UNLIMITED) - ema_ops = [ema_tick_decay(time_scale_ticks=100, cols="ema_a = a"), - ema_tick_decay(time_scale_ticks=100, cols="ema_a = a", op_control=op_ctrl), - ema_time_decay(ts_col="Timestamp", time_scale=10, cols="ema_a = a"), - ema_time_decay(ts_col="Timestamp", time_scale="00:00:00.001", cols="ema_c = c", - op_control=op_ctrl), - ] + ops = [ema_tick_decay(time_scale_ticks=100, cols="ema_a = a"), + ema_tick_decay(time_scale_ticks=100, cols="ema_a = a", op_control=op_ctrl), + ema_time_decay(ts_col="Timestamp", time_scale=10, cols="ema_a = a"), + ema_time_decay(ts_col="Timestamp", time_scale="00:00:00.001", cols="ema_c = c", + op_control=op_ctrl), + ] - for ema_op in ema_ops: - with self.subTest(ema_op): + for op in ops: + with self.subTest(op): for t in (self.static_table, self.ticking_table): + rt = t.update_by(ops=op, by="b") + self.assertTrue(rt.is_refreshing is t.is_refreshing) + self.assertEqual(len(rt.columns), 1 + len(t.columns)) with ugp.exclusive_lock(): - ema_table = t.update_by(ops=ema_op, by="b") - self.assertTrue(ema_table.is_refreshing is t.is_refreshing) - self.assertEqual(len(ema_table.columns), 1 + len(t.columns)) - with ugp.exclusive_lock(): - self.assertEqual(ema_table.size, t.size) + self.assertEqual(rt.size, t.size) def test_simple_ops(self): op_builders = [cum_sum, cum_prod, cum_min, cum_max, forward_fill] @@ -52,11 +51,11 @@ def test_simple_ops(self): for op_builder in op_builders: with self.subTest(op_builder): for t in (self.static_table, self.ticking_table): + rt = t.update_by(ops=op_builder(pairs), by="e") + self.assertTrue(rt.is_refreshing is t.is_refreshing) + self.assertEqual(len(rt.columns), 2 + len(t.columns)) with ugp.exclusive_lock(): - updateby_table = t.update_by(ops=op_builder(pairs), by="e") - self.assertTrue(updateby_table.is_refreshing is t.is_refreshing) - self.assertEqual(len(updateby_table.columns), 2 + len(t.columns)) - self.assertEqual(updateby_table.size, t.size) + self.assertEqual(rt.size, t.size) def test_simple_ops_proxy(self): op_builders = [cum_sum, cum_prod, cum_min, cum_max, forward_fill] @@ -68,51 +67,81 @@ def test_simple_ops_proxy(self): for op_builder in op_builders: with self.subTest(op_builder): for pt_proxy in pt_proxies: - if pt_proxy.is_refreshing: - ugp.auto_locking = True - updateby_proxy = pt_proxy.update_by(ops=op_builder(pairs), by="e") + rt_proxy = pt_proxy.update_by(ops=op_builder(pairs), by="e") - self.assertTrue(updateby_proxy.is_refreshing is pt_proxy.is_refreshing) - self.assertEqual(len(updateby_proxy.target.constituent_table_columns), + self.assertTrue(rt_proxy.is_refreshing is pt_proxy.is_refreshing) + self.assertEqual(len(rt_proxy.target.constituent_table_columns), 2 + len(pt_proxy.target.constituent_table_columns)) - for ct, ub_ct in zip(pt_proxy.target.constituent_tables, updateby_proxy.target.constituent_tables): + for ct, rct in zip(pt_proxy.target.constituent_tables, rt_proxy.target.constituent_tables): with ugp.exclusive_lock(): - self.assertEqual(ct.size, ub_ct.size) - - if pt_proxy.is_refreshing: - ugp.auto_locking = False + self.assertEqual(ct.size, rct.size) def test_ema_proxy(self): op_ctrl = OperationControl(on_null=BadDataBehavior.THROW, on_nan=BadDataBehavior.RESET, big_value_context=MathContext.UNLIMITED) - ema_ops = [ema_tick_decay(time_scale_ticks=100, cols="ema_a = a"), - ema_tick_decay(time_scale_ticks=100, cols="ema_a = a", op_control=op_ctrl), - ema_time_decay(ts_col="Timestamp", time_scale=10, cols="ema_a = a"), - ema_time_decay(ts_col="Timestamp", time_scale=100, cols="ema_c = c", - op_control=op_ctrl), - ] + ops = [ema_tick_decay(time_scale_ticks=100, cols="ema_a = a"), + ema_tick_decay(time_scale_ticks=100, cols="ema_a = a", op_control=op_ctrl), + ema_time_decay(ts_col="Timestamp", time_scale=10, cols="ema_a = a"), + ema_time_decay(ts_col="Timestamp", time_scale=100, cols="ema_c = c", + op_control=op_ctrl), + ] pt_proxies = [self.static_table.partition_by("b").proxy(), self.ticking_table.partition_by("b").proxy(), ] - for ema_op in ema_ops: - with self.subTest(ema_op): + for op in ops: + with self.subTest(op): for pt_proxy in pt_proxies: - if pt_proxy.is_refreshing: - ugp.auto_locking = True - - ema_proxy = pt_proxy.update_by(ema_op, by="e") - for ct, ema_ct in zip(pt_proxy.target.constituent_tables, ema_proxy.target.constituent_tables): - self.assertTrue(ema_ct.is_refreshing is ct.is_refreshing) - self.assertEqual(len(ema_ct.columns), 1 + len(ct.columns)) + rt_proxy = pt_proxy.update_by(op, by="e") + for ct, rct in zip(pt_proxy.target.constituent_tables, rt_proxy.target.constituent_tables): + self.assertTrue(rct.is_refreshing is ct.is_refreshing) + self.assertEqual(len(rct.columns), 1 + len(ct.columns)) with ugp.exclusive_lock(): - self.assertEqual(ct.size, ema_ct.size) + self.assertEqual(ct.size, rct.size) + + def test_rolling_sum(self): + ops = [ + rolling_sum_tick(cols=["rsum_a = a", "rsum_d = d"], rev_ticks=10), + rolling_sum_tick(cols=["rsum_a = a", "rsum_d = d"], rev_ticks=10, fwd_ticks=10), + rolling_sum_time(ts_col="Timestamp", cols=["rsum_b = b", "rsum_e = e"], rev_time="00:00:10"), + rolling_sum_time(ts_col="Timestamp", cols=["rsum_b = b", "rsum_e = e"], rev_time="00:00:10", + fwd_time=-10_000_000_00), + ] + + for op in ops: + with self.subTest(op): + for t in (self.static_table, self.ticking_table): + rt = t.update_by(ops=op, by="c") + self.assertTrue(rt.is_refreshing is t.is_refreshing) + self.assertEqual(len(rt.columns), 2 + len(t.columns)) + with ugp.exclusive_lock(): + self.assertEqual(rt.size, t.size) + + def test_rolling_sum_proxy(self): + ops = [ + rolling_sum_tick(cols=["rsum_a = a", "rsum_d = d"], rev_ticks=10), + rolling_sum_tick(cols=["rsum_a = a", "rsum_d = d"], rev_ticks=10, fwd_ticks=10), + rolling_sum_time(ts_col="Timestamp", cols=["rsum_b = b", "rsum_e = e"], rev_time="00:00:10"), + rolling_sum_time(ts_col="Timestamp", cols=["rsum_b = b", "rsum_e = e"], rev_time="00:00:10", + fwd_time=-10_000_000_00), + ] - if pt_proxy.is_refreshing: - ugp.auto_locking = True + pt_proxies = [self.static_table.partition_by("b").proxy(), + self.ticking_table.partition_by("b").proxy(), + ] + + for op in ops: + with self.subTest(op): + for pt_proxy in pt_proxies: + rt_proxy = pt_proxy.update_by(op, by="c") + for ct, rct in zip(pt_proxy.target.constituent_tables, rt_proxy.target.constituent_tables): + self.assertTrue(rct.is_refreshing is ct.is_refreshing) + self.assertEqual(len(rct.columns), 2 + len(ct.columns)) + with ugp.exclusive_lock(): + self.assertEqual(ct.size, rct.size) if __name__ == '__main__': From 452d7bb2b28118cf174104f6babcb8c8ace2be82 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 2 Feb 2023 12:06:19 -0700 Subject: [PATCH 2/4] Respond to suggestions in review --- py/server/deephaven/updateby.py | 10 +++++----- py/server/tests/test_updateby.py | 4 +++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/py/server/deephaven/updateby.py b/py/server/deephaven/updateby.py index 1d74d517f05..694da9c4553 100644 --- a/py/server/deephaven/updateby.py +++ b/py/server/deephaven/updateby.py @@ -315,16 +315,16 @@ def rolling_sum_time(ts_col: str, cols: Union[str, List[str]], rev_time: Union[i Here are some examples of window values: rev_time = 0, fwd_time = 0 - contains rows that exactly match the current row timestamp - rev_time = 600_000_000_000, fwd_time = 0 - contains rows from 10m earlier through the current row timestamp ( + rev_time = "00:10:00", fwd_time = "0" - contains rows from 10m earlier through the current row timestamp ( inclusive) rev_time = 0, fwd_time = 600_000_000_000 - contains rows from the current row through 10m following the current row timestamp (inclusive) - rev_time = 600_000_000_000, fwd_time = 600_000_000_000 - contains rows from 10m earlier through 10m following + rev_time = "00:10:00", fwd_time = "00:10:00" - contains rows from 10m earlier through 10m following the current row timestamp (inclusive) - rev_time = 600_000_000_000, fwd_time = -30_000_000_000 - contains rows from 10m earlier through 5m before the + rev_time = "00:10:00", fwd_time = "-00:05:00" - contains rows from 10m earlier through 5m before the current * row timestamp (inclusive), this is a purely backwards looking window - rev_time = -300_000_000_000, fwd_time = 600_000_000_000} - contains rows from 5m following through 10m - following the current row timestamp (inclusive), this is a purely forwards looking window + rev_time = "-00:05:00", fwd_time = "00:10:00"} - contains rows from 5m following through 10m + following the current row timestamp (inclusive), this is a purely forwards looking window Args: ts_col (str): diff --git a/py/server/tests/test_updateby.py b/py/server/tests/test_updateby.py index d232547e3ce..8a9d0445025 100644 --- a/py/server/tests/test_updateby.py +++ b/py/server/tests/test_updateby.py @@ -107,8 +107,10 @@ def test_rolling_sum(self): rolling_sum_tick(cols=["rsum_a = a", "rsum_d = d"], rev_ticks=10), rolling_sum_tick(cols=["rsum_a = a", "rsum_d = d"], rev_ticks=10, fwd_ticks=10), rolling_sum_time(ts_col="Timestamp", cols=["rsum_b = b", "rsum_e = e"], rev_time="00:00:10"), - rolling_sum_time(ts_col="Timestamp", cols=["rsum_b = b", "rsum_e = e"], rev_time="00:00:10", + rolling_sum_time(ts_col="Timestamp", cols=["rsum_b = b", "rsum_e = e"], rev_time=10_000_000_000, fwd_time=-10_000_000_00), + rolling_sum_time(ts_col="Timestamp", cols=["rsum_b = b", "rsum_e = e"], rev_time="00:00:30", + fwd_time="-00:00:30"), ] for op in ops: From 11ab764d3922720f9fd0edacb4d71ee0446c3984 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 2 Feb 2023 12:14:19 -0700 Subject: [PATCH 3/4] Make test case more sensible --- py/server/tests/test_updateby.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py/server/tests/test_updateby.py b/py/server/tests/test_updateby.py index 8a9d0445025..4519a3f104d 100644 --- a/py/server/tests/test_updateby.py +++ b/py/server/tests/test_updateby.py @@ -110,7 +110,7 @@ def test_rolling_sum(self): rolling_sum_time(ts_col="Timestamp", cols=["rsum_b = b", "rsum_e = e"], rev_time=10_000_000_000, fwd_time=-10_000_000_00), rolling_sum_time(ts_col="Timestamp", cols=["rsum_b = b", "rsum_e = e"], rev_time="00:00:30", - fwd_time="-00:00:30"), + fwd_time="-00:00:20"), ] for op in ops: From 1ba0b9bf93e73d137ccc7827f327551e957d500d Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Fri, 3 Feb 2023 10:41:01 -0700 Subject: [PATCH 4/4] Improve doc strings --- py/server/deephaven/updateby.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/py/server/deephaven/updateby.py b/py/server/deephaven/updateby.py index 694da9c4553..8cfa1221b22 100644 --- a/py/server/deephaven/updateby.py +++ b/py/server/deephaven/updateby.py @@ -105,7 +105,8 @@ def ema_tick_decay(time_scale_ticks: int, cols: Union[str, List[str]], Args: time_scale_ticks (int): the decay rate in ticks - cols (Union[str, List[str]]): the column(s) to be operated on, can be renaming expressions, i.e. "new_col = col" + cols (Union[str, List[str]]): the column(s) to be operated on, can include expressions to rename the output, + i.e. "new_col = col"; when empty, update_by perform the ema operation on all columns. op_control (OperationControl): defines how special cases should behave, when None, the default OperationControl settings as specified in :meth:`~OperationControl.__init__` will be used @@ -140,7 +141,8 @@ def ema_time_decay(ts_col: str, time_scale: Union[int, str], cols: Union[str, Li time_scale (Union[int, str]): the decay rate, can be expressed as an integer in nanoseconds or a time interval string, e.g. "00:00:00.001" - cols (Union[str, List[str]]): the column(s) to be operated on, can be renaming expressions, i.e. "new_col = col" + cols (Union[str, List[str]]): the column(s) to be operated on, can include expressions to rename the output, + i.e. "new_col = col"; when empty, update_by perform the ema operation on all columns. op_control (OperationControl): defines how special cases should behave, when None, the default OperationControl settings as specified in :meth:`~OperationControl.__init__` will be used @@ -270,7 +272,7 @@ def forward_fill(cols: Union[str, List[str]]) -> UpdateByOperation: def rolling_sum_tick(cols: Union[str, List[str]], rev_ticks: int, fwd_ticks: int = 0) -> UpdateByOperation: """Creates a rolling sum UpdateByOperation for the supplied column names, using ticks as the windowing unit. Ticks - are row counts and you may specify the reverse and forward window in number of rows to include. The current row + are row counts, and you may specify the reverse and forward window in number of rows to include. The current row is considered to belong to the reverse window but not the forward window. Also, negative values are allowed and can be used to generate completely forward or completely reverse windows. @@ -289,7 +291,7 @@ def rolling_sum_tick(cols: Union[str, List[str]], rev_ticks: int, fwd_ticks: int Args: cols (Union[str, List[str]]): the column(s) to be operated on, can include expressions to rename the output, - i.e. "new_col = col"; when empty, update_by perform the forward fill operation on all columns. + i.e. "new_col = col"; when empty, update_by perform the rolling sum operation on all columns. rev_ticks (int): the look-behind window size (in rows/ticks) fwd_ticks (int): the look-forward window size (int rows/ticks), default is 0 @@ -308,28 +310,29 @@ def rolling_sum_tick(cols: Union[str, List[str]], rev_ticks: int, fwd_ticks: int def rolling_sum_time(ts_col: str, cols: Union[str, List[str]], rev_time: Union[int, str], fwd_time: Union[int, str] = 0) -> UpdateByOperation: - """Creates a rolling sum UpdateByOperation for the supplied column names, using time as the windowing unit. This - function accepts nanoseconds as the reverse and forward window parameters. Negative values are allowed and can be - used to generate completely forward or completely reverse windows. A row containing a null in the timestamp - column belongs to no window and will not have a value computed or be considered in the windows of other rows. + """Creates a rolling sum UpdateByOperation for the supplied column names, using time as the windowing unit. This + function accepts nanoseconds or time strings as the reverse and forward window parameters. Negative values are + allowed and can be used to generate completely forward or completely reverse windows. A row containing a null in + the timestamp column belongs to no window and will not have a value computed or be considered in the windows of + other rows. Here are some examples of window values: rev_time = 0, fwd_time = 0 - contains rows that exactly match the current row timestamp - rev_time = "00:10:00", fwd_time = "0" - contains rows from 10m earlier through the current row timestamp ( + rev_time = "00:10:00", fwd_time = "0" - contains rows from 10m before through the current row timestamp ( inclusive) rev_time = 0, fwd_time = 600_000_000_000 - contains rows from the current row through 10m following the current row timestamp (inclusive) - rev_time = "00:10:00", fwd_time = "00:10:00" - contains rows from 10m earlier through 10m following + rev_time = "00:10:00", fwd_time = "00:10:00" - contains rows from 10m before through 10m following the current row timestamp (inclusive) - rev_time = "00:10:00", fwd_time = "-00:05:00" - contains rows from 10m earlier through 5m before the - current * row timestamp (inclusive), this is a purely backwards looking window + rev_time = "00:10:00", fwd_time = "-00:05:00" - contains rows from 10m before through 5m before the + current row timestamp (inclusive), this is a purely backwards looking window rev_time = "-00:05:00", fwd_time = "00:10:00"} - contains rows from 5m following through 10m following the current row timestamp (inclusive), this is a purely forwards looking window Args: ts_col (str): cols (Union[str, List[str]]): the column(s) to be operated on, can include expressions to rename the output, - i.e. "new_col = col"; when empty, update_by perform the forward fill operation on all columns. + i.e. "new_col = col"; when empty, update_by perform the rolling sum operation on all columns. rev_time (int): the look-behind window size, can be expressed as an integer in nanoseconds or a time interval string, e.g. "00:00:00.001" fwd_time (int): the look-ahead window size, can be expressed as an integer in nanoseconds or a time