-
Notifications
You must be signed in to change notification settings - Fork 80
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Minimalistic interface design to ingest ODBC data
- Loading branch information
1 parent
a76b161
commit 8b64035
Showing
2 changed files
with
70 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
# | ||
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending | ||
# | ||
"""This module supports ingesting data from external databases into Deephaven via database connectivity interfaces. | ||
Currently, it requires that application must use Turbodbc (https://turbodbc.readthedocs.io/en/latest/index.html) to | ||
access a supported relational database. Turbodbc has built-in Apache Arrow support which enables Deephaven to | ||
achieve maximum performance and efficiency when ingesting relational data. | ||
Looking ahead, support of Arrow Database Connectivity (ADBC) https://arrow.apache.org/docs/dev/format/ADBC.html | ||
will be added once it reaches stable status. ADBC offers a standard API to fetch data in Arrow format from data | ||
sources that support Arrow natively as well as ODBC-only databases. | ||
""" | ||
from typing import Any | ||
|
||
from deephaven.table import Table | ||
from deephaven import arrow as dharrow | ||
from deephaven import DHError | ||
|
||
try: | ||
import turbodbc | ||
except ImportError: | ||
raise DHError(message="import turbodbc failed") | ||
|
||
|
||
def read_cursor(cursor: Any) -> Table: | ||
"""Converts the result set of the provided cursor into a Deephaven table. | ||
Args: | ||
cursor (Any): a Turbodbc cursor. Prior to it being passed in, its execute() method must be called to run a query | ||
operation that produces a result set | ||
Returns: | ||
a new Table | ||
Raises: | ||
DHError | ||
""" | ||
|
||
if not isinstance(cursor, turbodbc.cursor.Cursor): | ||
raise DHError(message='') | ||
|
||
try: | ||
pa_table = cursor.fetchallarrow() | ||
except Exception as e: | ||
raise DHError(e, message="failed to fetch ODBC result as a Arrow table.") from e | ||
|
||
return dharrow.to_table(pa_table) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
# | ||
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending | ||
# | ||
import unittest | ||
|
||
from tests.testbase import BaseTestCase | ||
from turbodbc import connect | ||
from deephaven import dbc as dhdb | ||
|
||
|
||
class DbcTestCase(BaseTestCase): | ||
def test_read_db(self): | ||
conn = connect( | ||
connection_string='Driver={PostgreSQL};Server=localhost;Port=5432;Database=test;Uid=test;Pwd=test;') | ||
cursor = conn.cursor() | ||
cursor.execute("SELECT t_ts, t_id, t_instrument, t_exchange, t_price FROM CRYPTO_TRADES LIMIT 1000") | ||
table = dhdb.read_cursor(cursor) | ||
self.assertEqual(table.size, 1000) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |