Skip to content

Commit

Permalink
feat(py/client-ticking): Add support for LocalDate and LocalTime (dee…
Browse files Browse the repository at this point in the history
…phaven#6106)

This PR adds support for `LocalDate` and `LocalTime` to
pydeephaven/ticking.

Note this PR depends on
deephaven#6105, so that one
should be merged first.

It also depends on
deephaven#6137, which also needs
to be merged first.

I also add a comment to pydeephaven/README.md to remind people to make a
venv.

Also note that this is a fix for
deephaven#6044
  • Loading branch information
kosak authored Sep 26, 2024
1 parent 967b203 commit b404621
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 12 deletions.
8 changes: 8 additions & 0 deletions py/client-ticking/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ package just built from the above steps, rather than one from PyPi.

## Testing the library

### Run tests
``` shell
$ python3 -m unittest discover tests

```

### Sample Python program

Run python from the venv while in this directory, and try this sample Python program:

``` python
Expand Down
16 changes: 16 additions & 0 deletions py/client-ticking/src/pydeephaven_ticking/_core.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,18 @@ cdef extern from "deephaven/dhcore/types.h" namespace "deephaven::dhcore":
kBool "deephaven::dhcore::ElementTypeId::kBool"
kString "deephaven::dhcore::ElementTypeId::kString"
kTimestamp "deephaven::dhcore::ElementTypeId::kTimestamp"
kLocalDate "deephaven::dhcore::ElementTypeId::kLocalDate"
kLocalTime "deephaven::dhcore::ElementTypeId::kLocalTime"

cdef cppclass CDateTime "deephaven::dhcore::DateTime":
pass

cdef cppclass CLocalDate "deephaven::dhcore::LocalDate":
pass

cdef cppclass CLocalTime "deephaven::dhcore::LocalTime":
pass

cdef extern from "deephaven/dhcore/utility/cython_support.h" namespace "deephaven::dhcore::utility":
cdef cppclass CCythonSupport "deephaven::dhcore::utility::CythonSupport":
@staticmethod
Expand All @@ -157,6 +165,14 @@ cdef extern from "deephaven/dhcore/utility/cython_support.h" namespace "deephave
shared_ptr[CColumnSource] CreateDateTimeColumnSource(const int64_t *dataBegin, const int64_t *dataEnd,
const uint8_t *validityBegin, const uint8_t *validityEnd, size_t numElements)

@staticmethod
shared_ptr[CColumnSource] CreateLocalDateColumnSource(const int64_t *dataBegin, const int64_t *dataEnd,
const uint8_t *validityBegin, const uint8_t *validityEnd, size_t numElements)

@staticmethod
shared_ptr[CColumnSource] CreateLocalTimeColumnSource(const int64_t *dataBegin, const int64_t *dataEnd,
const uint8_t *validityBegin, const uint8_t *validityEnd, size_t numElements)

@staticmethod
ElementTypeId GetElementTypeId(const CColumnSource &columnSource)

Expand Down
77 changes: 68 additions & 9 deletions py/client-ticking/src/pydeephaven_ticking/_core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ from libcpp.memory cimport shared_ptr, unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move, pair
from libcpp.vector cimport vector
from typing import Dict, List, Sequence, Union, cast
from typing import Sequence, cast

# Simple wrapper of the corresponding C++ TickingUpdate class.
cdef class TickingUpdate:
Expand Down Expand Up @@ -291,6 +291,16 @@ cdef class ColumnSource:
dest_data_as_int64 = dest_data.view(dtype=np.int64)
self._fill_timestamp_chunk(rows, dest_data_as_int64, null_flags_ptr)
arrow_type = pa.timestamp("ns", tz="UTC")
elif element_type_id == ElementTypeId.kLocalDate:
dest_data = np.zeros(size, dtype=np.int64)
dest_data_as_int64 = dest_data.view(dtype=np.int64)
self._fill_localdate_chunk(rows, dest_data_as_int64, null_flags_ptr)
arrow_type = pa.date64()
elif element_type_id == ElementTypeId.kLocalTime:
dest_data = np.zeros(size, dtype=np.int64)
dest_data_as_int64 = dest_data.view(dtype=np.int64)
self._fill_localtime_chunk(rows, dest_data_as_int64, null_flags_ptr)
arrow_type = pa.time64("ns")
else:
raise RuntimeError(f"Unexpected ElementTypeId {<int>element_type_id}")

Expand Down Expand Up @@ -329,13 +339,36 @@ cdef class ColumnSource:
# fill_chunk helper method for timestamp. In this case we shamelessly treat the Python timestamp
# type as an int64, and then further shamelessly pretend that it's a Deephaven DateTime type.
cdef _fill_timestamp_chunk(self, rows: RowSequence, int64_t[::1] dest_data, CGenericChunk[bool] *null_flags_ptr):
"""
static_assert(sizeof(int64_t) == sizeof(CDateTime));
"""
cdef extern from "<type_traits>":
"""
static_assert(deephaven::dhcore::DateTime::IsBlittableToInt64());
"""
rsSize = rows.size
dest_chunk = CGenericChunk[CDateTime].CreateView(<CDateTime*>&dest_data[0], rsSize)
deref(self.column_source).FillChunk(deref(rows.row_sequence), &dest_chunk, null_flags_ptr)

# fill_chunk helper method for LocalDate. In this case we shamelessly treat the Python timestamp
# type as an int64, and then further shamelessly pretend that it's a Deephaven LocalDate type.
cdef _fill_localdate_chunk(self, rows: RowSequence, int64_t[::1] dest_data, CGenericChunk[bool] *null_flags_ptr):
cdef extern from "<type_traits>":
"""
static_assert(deephaven::dhcore::LocalDate::IsBlittableToInt64());
"""
rsSize = rows.size
dest_chunk = CGenericChunk[CLocalDate].CreateView(<CLocalDate*>&dest_data[0], rsSize)
deref(self.column_source).FillChunk(deref(rows.row_sequence), &dest_chunk, null_flags_ptr)

# fill_chunk helper method for LocalTime. In this case we shamelessly treat the Python timestamp
# type as an int64, and then further shamelessly pretend that it's a Deephaven LocalTime type.
cdef _fill_localtime_chunk(self, rows: RowSequence, int64_t[::1] dest_data, CGenericChunk[bool] *null_flags_ptr):
cdef extern from "<type_traits>":
"""
static_assert(deephaven::dhcore::LocalTime::IsBlittableToInt64());
"""
rsSize = rows.size
dest_chunk = CGenericChunk[CLocalTime].CreateView(<CLocalTime*>&dest_data[0], rsSize)
deref(self.column_source).FillChunk(deref(rows.row_sequence), &dest_chunk, null_flags_ptr)

# Converts an Arrow array to a C++ ColumnSource of the right type. The created column source does not own the
# memory used, so it is only valid as long as the original Arrow array is valid.
cdef shared_ptr[CColumnSource] _convert_arrow_array_to_column_source(array: pa.Array) except *:
Expand All @@ -345,6 +378,10 @@ cdef shared_ptr[CColumnSource] _convert_arrow_array_to_column_source(array: pa.A
return _convert_arrow_boolean_array_to_column_source(cast(pa.lib.BooleanArray, array))
if isinstance(array, pa.lib.TimestampArray):
return _convert_arrow_timestamp_array_to_column_source(cast(pa.lib.TimestampArray, array))
if isinstance(array, pa.lib.Date64Array):
return _convert_arrow_date64_array_to_column_source(cast(pa.lib.Date64Array, array))
if isinstance(array, pa.lib.Time64Array):
return _convert_arrow_time64_array_to_column_source(cast(pa.lib.Time64Array, array))
buffers = array.buffers()
if len(buffers) != 2:
raise RuntimeError(f"Expected 2 simple type buffers, got {len(buffers)}")
Expand Down Expand Up @@ -427,10 +464,32 @@ cdef shared_ptr[CColumnSource] _convert_arrow_string_array_to_column_source(arra
# Converts an Arrow TimestampArray to a C++ DateTimeColumnSource. The created column source does not own the
# memory used, so it is only valid as long as the original Arrow array is valid.
cdef shared_ptr[CColumnSource] _convert_arrow_timestamp_array_to_column_source(array: pa.TimestampArray) except *:
return _convert_underlying_int64_to_column_source(array, CCythonSupport.CreateDateTimeColumnSource)

# Converts an Arrow Date64Array to a C++ LocalDateColumnSource. The created column source does not own the
# memory used, so it is only valid as long as the original Arrow array is valid.
cdef shared_ptr[CColumnSource] _convert_arrow_date64_array_to_column_source(array: pa.Date64Array) except *:
return _convert_underlying_int64_to_column_source(array, CCythonSupport.CreateLocalDateColumnSource)

# Converts an Arrow Time64Array to a C++ LocalTimeColumnSource. The created column source does not own the
# memory used, so it is only valid as long as the original Arrow array is valid.
cdef shared_ptr[CColumnSource] _convert_arrow_time64_array_to_column_source(array: pa.Time64Array) except *:
return _convert_underlying_int64_to_column_source(array, CCythonSupport.CreateLocalTimeColumnSource)

# Signature of one of the factory functions in CCythonSupport: CreateDateTimeColumnSource, CreateLocalDateColumnSource
# or CreateLocalTimeColumnSource.
ctypedef shared_ptr[CColumnSource](*factory_t)(const int64_t *, const int64_t *, const uint8_t *, const uint8_t *, size_t)

# Converts one of the numeric Arrow types with an underlying int64 representation to the
# corresponding ColumnSource type. The created column source does not own the
# memory used, so it is only valid as long as the original Arrow array is valid.
cdef shared_ptr[CColumnSource] _convert_underlying_int64_to_column_source(
array: pa.NumericArray,
factory: factory_t) except *:
num_elements = len(array)
buffers = array.buffers()
if len(buffers) != 2:
raise RuntimeError(f"Expected 2 timestamp buffers, got {len(buffers)}")
raise RuntimeError(f"Expected 2 buffers, got {len(buffers)}")
validity = buffers[0]
data = buffers[1]

Expand All @@ -442,9 +501,7 @@ cdef shared_ptr[CColumnSource] _convert_arrow_timestamp_array_to_column_source(a

cdef const int64_t *data_begin = <const int64_t *> <intptr_t> data.address
cdef const int64_t *data_end = <const int64_t *> <intptr_t> (data.address + data.size)

return CCythonSupport.CreateDateTimeColumnSource(data_begin, data_end, validity_begin, validity_end,
num_elements)
return factory(data_begin, data_end, validity_begin, validity_end, num_elements)

# This method converts a PyArrow Schema object to a C++ Schema object.
cdef shared_ptr[CSchema] _pyarrow_schema_to_deephaven_schema(src: pa.Schema) except *:
Expand Down Expand Up @@ -547,7 +604,9 @@ cdef _equivalentTypes = [
_EquivalentTypes.create(ElementTypeId.kDouble, pa.float64()),
_EquivalentTypes.create(ElementTypeId.kBool, pa.bool_()),
_EquivalentTypes.create(ElementTypeId.kString, pa.string()),
_EquivalentTypes.create(ElementTypeId.kTimestamp, pa.timestamp("ns", "UTC"))
_EquivalentTypes.create(ElementTypeId.kTimestamp, pa.timestamp("ns", "UTC")),
_EquivalentTypes.create(ElementTypeId.kLocalDate, pa.date64()),
_EquivalentTypes.create(ElementTypeId.kLocalTime, pa.time64("ns"))
]

# Converts a Deephaven type (an enum) into the corresponding PyArrow type.
Expand Down
138 changes: 138 additions & 0 deletions py/client-ticking/tests/test_ticking_all_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#

import unittest

import pyarrow as pa
import pydeephaven as dh
import datetime as dt
import queue
from typing import List

class TickingAllTypesTestCase(unittest.TestCase):
queue = queue.Queue()
errors: List[str] = []

def test_ticking_basic_time_table(self):
session = dh.Session()
table = session.empty_table(size = 10).update(
formulas = [
"Chars = ii == 5 ? null : (char)('a' + ii)",
"Bytes = ii == 5 ? null : (byte)(ii)",
"Shorts = ii == 5 ? null : (short)(ii)",
"Ints = ii == 5 ? null : (int)(ii)",
"Longs = ii == 5 ? null : (long)(ii)",
"Floats = ii == 5 ? null : (float)(ii)",
"Doubles = ii == 5 ? null : (double)(ii)",
"Bools = ii == 5 ? null : ((ii % 2) == 0)",
"Strings = ii == 5 ? null : `hello ` + i",
"DateTimes = ii == 5 ? null : '2001-03-01T12:34:56Z' + ii",
"LocalDates = ii == 5 ? null : '2001-03-01' + (i * 'P1D')",
"LocalTimes = ii == 5 ? null : '12:34:56.000'.plus(ii * 'PT1S')"
])
session.bind_table(name="all_types_table", table=table)

listener_handle = dh.listen(table, self.handle_update)
listener_handle.start()

timed_out = False
try:
_ = self.queue.get(block = True, timeout = 10)
except queue.Empty:
timed_out = True

listener_handle.stop()
session.close()

if len(self.errors) != 0:
self.fail("\n".join(self.errors))

if timed_out:
self.fail("Test timed out")

def handle_update(self, update):
added = update.added()
if len(added) == 0:
return

expected_char_data = []
expected_byte_data = []
expected_short_data = []
expected_int_data = []
expected_long_data = []
expected_float_data = []
expected_double_data = []
expected_bool_data = []
expected_string_data = []
expected_date_time_data = []
expected_local_date_data = []
expected_local_time_data = []

date_time_base = pa.scalar(dt.datetime(2001, 3, 1, 12, 34, 56), type=pa.timestamp("ns", tz="UTC"))
date_time_nanos = date_time_base.value

# Use a datetime, do arithmetic on it, then pull out the time component
local_time_base = dt.datetime(2001, 1, 1, 12, 34, 56)

for i in range(10):
expected_char_data.append(ord('a') + i)
expected_byte_data.append(i)
expected_short_data.append(i)
expected_int_data.append(i)
expected_long_data.append(i)
expected_float_data.append(i)
expected_double_data.append(i)
expected_bool_data.append((i % 2) == 0)
expected_string_data.append(f"hello {i}")
expected_date_time_data.append(date_time_nanos + i)
expected_local_date_data.append(dt.datetime(2001, 3, 1) + dt.timedelta(days = i))
expected_local_time_data.append((local_time_base + dt.timedelta(seconds = i)).time())
expected_char_data[5] = None
expected_byte_data[5] = None
expected_short_data[5] = None
expected_int_data[5] = None
expected_long_data[5] = None
expected_float_data[5] = None
expected_double_data[5] = None
expected_bool_data[5] = None
expected_string_data[5] = None
expected_date_time_data[5] = None
expected_local_date_data[5] = None
expected_local_time_data[5] = None

expected_chars = pa.array(expected_char_data, pa.uint16())
expected_bytes = pa.array(expected_byte_data, pa.int8())
expected_shorts = pa.array(expected_short_data, pa.int16())
expected_ints = pa.array(expected_int_data, pa.int32())
expected_longs = pa.array(expected_long_data, pa.int64())
expected_floats = pa.array(expected_float_data, pa.float32())
expected_doubles = pa.array(expected_double_data, pa.float64())
expected_bools = pa.array(expected_bool_data, pa.bool_())
expected_strings = pa.array(expected_string_data, pa.string())
expected_date_times = pa.array(expected_date_time_data, pa.timestamp("ns", tz="UTC"))
expected_local_dates = pa.array(expected_local_date_data, pa.date64())
expected_local_times = pa.array(expected_local_time_data, pa.time64("ns"))

self.validate("Chars", expected_chars, added)
self.validate("Bytes", expected_bytes, added)
self.validate("Shorts", expected_shorts, added)
self.validate("Ints", expected_ints, added)
self.validate("Longs", expected_longs, added)
self.validate("Floats", expected_floats, added)
self.validate("Doubles", expected_doubles, added)
self.validate("Bools", expected_bools, added)
self.validate("Strings", expected_strings, added)
self.validate("DateTimes", expected_date_times, added)
self.validate("LocalDates", expected_local_dates, added)
self.validate("LocalTimes", expected_local_times, added)

self.queue.put("done")

def validate(self, what: str, expected: pa.Array, added):
actual = added[what]
if expected != actual:
self.errors.append(f"Column \"{what}\": expected={expected}, actual={actual}")

if __name__ == '__main__':
unittest.main()
6 changes: 3 additions & 3 deletions py/client-ticking/tests/test_ticking_basic.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#

import unittest
Expand All @@ -11,8 +11,8 @@
class TickingBasicTestCase(unittest.TestCase):
def test_ticking_basic_time_table(self):
session = dh.Session()
half_second_in_nanos = 200 * 1000 * 1000
table = session.time_table(period=half_second_in_nanos).update(formulas=["Col1 = i"])
fifth_second_in_nanos = 200 * 1000 * 1000
table = session.time_table(period=fifth_second_in_nanos).update(formulas=["Col1 = i"])
session.bind_table(name="my_ticking_table", table=table)
table_added_last_col1_seen = -1
table_added_update_count = 0
Expand Down
9 changes: 9 additions & 0 deletions py/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@

Deephaven Python Client is a Python package created by Deephaven Data Labs. It is a client API that allows Python applications to remotely access Deephaven data servers.

## `venv`

It's recommended to install in a Python virtual environment (venv). Use a command like the
below to create a venv.

``` shell
python3 -m venv ~/py/dhenv
```

## Source Directory

### From the deephaven-core repository root
Expand Down

0 comments on commit b404621

Please sign in to comment.