Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add read_sql in the dbc package #3434

Merged
merged 6 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Integrations/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# test-oriented local docker compose file to run redpanda and apicurio for testing
# test-oriented local docker compose file to run redpanda and postgres for testing

version: "3.4"

Expand Down
2 changes: 2 additions & 0 deletions docker/registry/cpp-client-base/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
io.deephaven.project.ProjectType=DOCKER_REGISTRY
deephaven.registry.imageName=ghcr.io/deephaven/cpp-client-base:latest
deephaven.registry.imageId=ghcr.io/deephaven/cpp-client-base@sha256:be329d3f792ddbf51b0d53a26f7a923a1b6e7990a9fffe8725cf7ba3f3e0da4a
# TODO(deephaven-base-images#54): arm64 native image for cpp-client-base
deephaven.registry.platform=linux/amd64
2 changes: 2 additions & 0 deletions docker/registry/protoc-base/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
io.deephaven.project.ProjectType=DOCKER_REGISTRY
deephaven.registry.imageName=ghcr.io/deephaven/protoc-base:latest
deephaven.registry.imageId=ghcr.io/deephaven/protoc-base@sha256:161d15724ae82ca0b4be788b4efc36d655361bd62b6c6b414def36d7f71c6150
# TODO(deephaven-base-images#55): arm64 native image for protoc-base
deephaven.registry.platform=linux/amd64
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
adbc-driver-manager==0.2.0
adbc-driver-postgresql==0.2.0
connectorx==0.3.1
connectorx==0.3.1; platform.machine == 'x86_64'
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
deephaven-plugin==0.3.0
java-utilities==0.2.0
jedi==0.18.2
Expand Down
2 changes: 1 addition & 1 deletion docker/server/src/main/server-netty/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
adbc-driver-manager==0.2.0
adbc-driver-postgresql==0.2.0
connectorx==0.3.1
connectorx==0.3.1; platform.machine == 'x86_64'
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
deephaven-plugin==0.3.0
java-utilities==0.2.0
jedi==0.18.2
Expand Down
1 change: 1 addition & 0 deletions py/server/deephaven/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@
from .table_factory import empty_table, time_table, merge, merge_sorted, new_table, DynamicTableWriter, input_table
from .replay import TableReplayer
from ._gc import garbage_collect
from .dbc import read_sql
79 changes: 65 additions & 14 deletions py/server/deephaven/dbc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,85 @@
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
"""The dbc package includes the modules and functions for using external databases with Deephaven."""
from typing import Any

import deephaven.arrow as dharrow
from deephaven import DHError
from deephaven.table import Table
import deephaven.arrow as dharrow


def read_sql(conn: str, query: str) -> Table:
"""Executes the provided SQL query via ConnectorX and returns a Deephaven table.
def read_sql(conn: Any, query: str, driver: str = "connectorx") -> Table:
"""Executes the provided SQL query via a supported driver and returns a Deephaven table.

Args:
conn (str): a connection string URI, please refer to https://sfu-db.github.io/connector-x/databases.html for
database specific format
conn (Any): must either be a connection string for the given driver or a Turbodbc/ADBC DBAPI Connection object
query (str): SQL query statement
driver: (str): the driver to use, supported drivers are "odbc", "adbc", "connectorx", default is "connectorx"

Returns:
a new Table

Raises:
DHError
"""
try:
import connectorx as cx
except ImportError:
raise DHError(message="import ConnectorX failed, please install it first.")
if isinstance(conn, str):
if driver == "connectorx":
try:
import connectorx as cx
except ImportError:
raise DHError(message="import connectorx failed, please install it first.")

try:
pa_table = cx.read_sql(conn=conn, query=query, return_type="arrow")
return dharrow.to_table(pa_table)
except Exception as e:
raise DHError(e, message="failed to get a Arrow table from ConnectorX.") from e
elif driver == "odbc":
from deephaven.dbc.odbc import read_cursor
import turbodbc
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
with turbodbc.connect(connection_string=conn) as conn:
with conn.cursor() as cursor:
cursor.execute(query)
return read_cursor(cursor)
elif driver == "adbc":
from deephaven.dbc.adbc import read_cursor
if not conn:
import adbc_driver_sqlite.dbapi as dbapi
elif conn.strip().startswith("postgresql"):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
import adbc_driver_postgresql.dbapi as dbapi
else:
raise DHError(message="not supported ADBC connection string")
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved

with dbapi.connect(conn) as dbconn:
with dbconn.cursor() as cursor:
cursor.execute(query)
return read_cursor(cursor)
else:
raise DHError(message=f"unsupported driver {driver}")
else:
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
try:
import adbc_driver_manager.dbapi as dbapi
if isinstance(conn, dbapi.Connection):
from deephaven.dbc.adbc import read_cursor
with conn.cursor() as cursor:
cursor.execute(query)
return read_cursor(cursor)
except ImportError:
pass

try:
import turbodbc.connection
if isinstance(conn, turbodbc.connection.Connection):
from deephaven.dbc.odbc import read_cursor
with conn.cursor() as cursor:
cursor.execute(query)
return read_cursor(cursor)
except ImportError:
pass

raise DHError(message="invalid conn argument")
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved




try:
pa_table = cx.read_sql(conn=conn, query=query, return_type="arrow")
except Exception as e:
raise DHError(e, message="failed to get a Arrow table from ConnectorX.") from e

return dharrow.to_table(pa_table)
53 changes: 48 additions & 5 deletions py/server/tests/test_dbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

import turbodbc

from deephaven import DHError
from deephaven.dbc import odbc as dhodbc, adbc as dhadbc, read_sql
from deephaven import DHError, read_sql
from deephaven.dbc import odbc as dhodbc, adbc as dhadbc
from tests.testbase import BaseTestCase


Expand All @@ -34,14 +34,57 @@ def test_read_adbc(self):
# instead of the actual size
self.assertEqual(table.size, 10)

@unittest.skipIf(platform.machine() != "x86_64", reason="connectorx not available on Linux/Arm64")
def test_read_sql(self):
postgres_url = "postgresql://test:test@postgres:5432/test"
@unittest.skipIf(platform.machine() != "x86_64", reason="connectorx not available on Linux/Arm64"
"https://github.com/sfu-db/connector-x/issues/386")
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
def test_read_sql_connectorx(self):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
query = "SELECT t_ts, t_id, t_instrument, t_exchange, t_price, t_size FROM CRYPTO_TRADES LIMIT 10"
postgres_url = "postgresql://test:test@postgres:5432/test"
dh_table = read_sql(conn=postgres_url, query=query)
self.assertEqual(len(dh_table.columns), 6)
self.assertEqual(dh_table.size, 10)

with self.assertRaises(DHError) as cm:
dh_table = read_sql(conn="garbage", query=query)

def test_read_sql(self):
query = "SELECT t_ts, t_id, t_instrument, t_exchange, t_price, t_size FROM CRYPTO_TRADES LIMIT 10"

with self.subTest("odbc"):
connection_string = 'Driver={PostgreSQL};Server=postgres;Port=5432;Database=test;Uid=test;Pwd=test;'
dh_table = read_sql(conn=connection_string, query=query, driver="odbc")
self.assertEqual(len(dh_table.columns), 6)
self.assertEqual(dh_table.size, 10)

with self.subTest("adbc"):
uri = "postgresql://postgres:5432/test?user=test&password=test"
dh_table = read_sql(conn=uri, query=query, driver="adbc")
self.assertEqual(len(dh_table.columns), 6)
self.assertEqual(dh_table.size, 10)

with self.subTest("odbc-connection"):
connection_string = 'Driver={PostgreSQL};Server=postgres;Port=5432;Database=test;Uid=test;Pwd=test;'
with turbodbc.connect(connection_string=connection_string) as conn:
dh_table = read_sql(conn=conn, query=query, driver="odbc")
self.assertEqual(len(dh_table.columns), 6)
self.assertEqual(dh_table.size, 10)

with self.subTest("adbc-connection"):
import adbc_driver_postgresql.dbapi
uri = "postgresql://postgres:5432/test?user=test&password=test"
with adbc_driver_postgresql.dbapi.connect(uri) as conn:
dh_table = read_sql(conn=conn, query=query, driver="adbc")
self.assertEqual(len(dh_table.columns), 6)
self.assertEqual(dh_table.size, 10)

with self.assertRaises(DHError) as cm:
dh_table = read_sql(conn=[None], query=query, driver="adbc")

with self.assertRaises(DHError) as cm:
dh_table = read_sql(conn="garbage", query=query, driver="adbc")

with self.assertRaises(Exception) as cm:
dh_table = read_sql(conn="garbage", query=query, driver="odbc")


if __name__ == '__main__':
unittest.main()