From 663f0937080f0b8640c22a4363c9b8348652a9be Mon Sep 17 00:00:00 2001 From: Jianfeng Mao <4297243+jmao-denver@users.noreply.github.com> Date: Tue, 31 Jan 2023 15:41:34 -0700 Subject: [PATCH] Relational data ingestion via ADBC/ODBC (#3279) * Add test env setup (Postgres, ODBC) * Respond to review comments and test env changes * Provide more helpful error messages Remove unwanted comment * Switch to turbodbc-installed server base images * Apply suggestions from code review Co-authored-by: Chip Kent <5250374+chipkent@users.noreply.github.com> * Use latest server base images with arrow-11 * Reduce the test data size in repo --------- Co-authored-by: Chip Kent <5250374+chipkent@users.noreply.github.com> --- Integrations/build.gradle | 8 +- Integrations/docker-compose.yml | 7 ++ docker/registry/server-base/gradle.properties | 2 +- docker/registry/slim-base/gradle.properties | 2 +- .../src/main/server-jetty/requirements.txt | 2 +- .../src/main/server-netty/requirements.txt | 2 +- postgres/docker-compose.yml | 13 +++ postgres/src/Dockerfile | 5 ++ postgres/src/crypto_trades.csv | 88 +++++++++++++++++++ postgres/src/gen_test_data.sql | 14 +++ py/server/deephaven/dbc/__init__.py | 5 ++ py/server/deephaven/dbc/adbc.py | 47 ++++++++++ py/server/deephaven/dbc/odbc.py | 45 ++++++++++ py/server/tests/test_dbc.py | 37 ++++++++ 14 files changed, 271 insertions(+), 6 deletions(-) create mode 100644 postgres/docker-compose.yml create mode 100644 postgres/src/Dockerfile create mode 100644 postgres/src/crypto_trades.csv create mode 100644 postgres/src/gen_test_data.sql create mode 100644 py/server/deephaven/dbc/__init__.py create mode 100644 py/server/deephaven/dbc/adbc.py create mode 100644 py/server/deephaven/dbc/odbc.py create mode 100644 py/server/tests/test_dbc.py diff --git a/Integrations/build.gradle b/Integrations/build.gradle index f8ee69495f0..8109f21b8c2 100644 --- a/Integrations/build.gradle +++ b/Integrations/build.gradle @@ -67,8 +67,12 @@ def runInDocker = { String name, String sourcePath, List command, Closur // set up the container, env vars - things that aren't likely to change from 'deephaven/server-netty:local-build' runCommand '''set -eux; \\ - pip3 install pyarrow unittest-xml-reporting==3.0.4;\\ - mkdir -p /out/report''' + pip3 install unittest-xml-reporting==3.0.4; \\ + mkdir -p /out/report; \\ + echo "[PostgreSQL]" > /etc/odbcinst.ini; \\ + echo "Driver = /usr/lib/x86_64-linux-gnu/odbc/psqlodbcw.so" >> /etc/odbcinst.ini; \\ + echo "Threading = 2" >> /etc/odbcinst.ini''' + volume '/data' volume '/cache' environmentVariable 'DEEPHAVEN_CLASSPATH', '/classpath/*:/classpath:/opt/deephaven/server/lib/*:/opt/deephaven/server/lib/' diff --git a/Integrations/docker-compose.yml b/Integrations/docker-compose.yml index 0d49a164c59..d2e5ad15415 100644 --- a/Integrations/docker-compose.yml +++ b/Integrations/docker-compose.yml @@ -10,3 +10,10 @@ services: expose: - 29092 - 8081 + + postgres: + extends: + file: ../postgres/docker-compose.yml + service: postgres + expose: + - 5432 diff --git a/docker/registry/server-base/gradle.properties b/docker/registry/server-base/gradle.properties index a6377dd1d88..b59e430d3ed 100644 --- a/docker/registry/server-base/gradle.properties +++ b/docker/registry/server-base/gradle.properties @@ -1,3 +1,3 @@ io.deephaven.project.ProjectType=DOCKER_REGISTRY deephaven.registry.imageName=ghcr.io/deephaven/server-base:edge -deephaven.registry.imageId=ghcr.io/deephaven/server-base@sha256:c9413a0b02287ccd16dd586c67fa42f526e68011d879795f5abd104c76ce4b42 +deephaven.registry.imageId=ghcr.io/deephaven/server-base@sha256:851d9ca940b9c3d779cbed459ed83d1866975a206b6fbdd364e41b63582ec5e6 diff --git a/docker/registry/slim-base/gradle.properties b/docker/registry/slim-base/gradle.properties index 8206e1fb010..309073a6c63 100644 --- a/docker/registry/slim-base/gradle.properties +++ b/docker/registry/slim-base/gradle.properties @@ -1,3 +1,3 @@ io.deephaven.project.ProjectType=DOCKER_REGISTRY deephaven.registry.imageName=ghcr.io/deephaven/server-slim-base:edge -deephaven.registry.imageId=ghcr.io/deephaven/server-slim-base@sha256:9db84913893f373158244327cb1c0ea0b1085908bfccf13ca679706585923525 +deephaven.registry.imageId=ghcr.io/deephaven/server-slim-base@sha256:5b79fe44fd5352a89a1291b9d886325037f85d089787b648e6cb7f3d95d7a0df diff --git a/docker/server-jetty/src/main/server-jetty/requirements.txt b/docker/server-jetty/src/main/server-jetty/requirements.txt index 01697c74457..af72bf60bd0 100644 --- a/docker/server-jetty/src/main/server-jetty/requirements.txt +++ b/docker/server-jetty/src/main/server-jetty/requirements.txt @@ -7,7 +7,7 @@ numba==0.56.4 numpy==1.23.5 pandas==1.5.3 parso==0.8.3 -pyarrow==10.0.1 +pyarrow==11.0.0 python-dateutil==2.8.2 pytz==2022.7.1 six==1.16.0 diff --git a/docker/server/src/main/server-netty/requirements.txt b/docker/server/src/main/server-netty/requirements.txt index 01697c74457..af72bf60bd0 100644 --- a/docker/server/src/main/server-netty/requirements.txt +++ b/docker/server/src/main/server-netty/requirements.txt @@ -7,7 +7,7 @@ numba==0.56.4 numpy==1.23.5 pandas==1.5.3 parso==0.8.3 -pyarrow==10.0.1 +pyarrow==11.0.0 python-dateutil==2.8.2 pytz==2022.7.1 six==1.16.0 diff --git a/postgres/docker-compose.yml b/postgres/docker-compose.yml new file mode 100644 index 00000000000..b90d18fda51 --- /dev/null +++ b/postgres/docker-compose.yml @@ -0,0 +1,13 @@ +version: "3.4" + +services: + postgres: + build: + context: src + hostname: postgres + ports: + - 5432:5432 + environment: + - POSTGRES_PASSWORD=test + - POSTGRES_USER=test + command: postgres -c fsync=off -c synchronous_commit=off diff --git a/postgres/src/Dockerfile b/postgres/src/Dockerfile new file mode 100644 index 00000000000..396ba911580 --- /dev/null +++ b/postgres/src/Dockerfile @@ -0,0 +1,5 @@ +FROM postgres:latest +RUN mkdir -p /tmp/psql_data/ + +COPY crypto_trades.csv /tmp/psql_data/ +COPY gen_test_data.sql /docker-entrypoint-initdb.d/ diff --git a/postgres/src/crypto_trades.csv b/postgres/src/crypto_trades.csv new file mode 100644 index 00000000000..bbf73397b83 --- /dev/null +++ b/postgres/src/crypto_trades.csv @@ -0,0 +1,88 @@ +"Date", "Timestamp", "Id", "Instrument", "Exchange", "Price", "Size" +"2021-09-22","1632326411780000000","609511672","ETH/USD","binance","3009.420000",".006300" +"2021-09-22","1632326411783000000","1064350098","BTC/USD","binance","43229.460000",".000430" +"2021-09-22","1632326411858000000","158876549","ETH/USD","coinbase-pro","3011.110000",".003305" +"2021-09-22","1632326411918000000","1064350099","BTC/USD","binance","43229.450000",".001290" +"2021-09-22","1632326411933000000","199242450","BTC/USD","bitstamp","43259.000000",".020000" +"2021-09-22","1632326411949000000","1064350100","BTC/USD","binance","43229.460000",".000010" +"2021-09-22","1632326411949000000","1064350101","BTC/USD","binance","43234.000000",".005220" +"2021-09-22","1632326411963000000","1064350102","BTC/USD","binance","43229.460000",".016680" +"2021-09-22","1632326411977000000","1064350103","BTC/USD","binance","43234.160000",".000580" +"2021-09-22","1632326412030000000","214133126","BTC/USD","coinbase-pro","43249.320000",".018136" +"2021-09-22","1632326412023000000","297608806","ETH/BTC","binance",".069603",".097100" +"2021-09-22","1632326412037000000","1064350104","BTC/USD","binance","43234.160000",".004260" +"2021-09-22","1632326412066000000","1064350105","BTC/USD","binance","43234.160000",".000010" +"2021-09-22","1632326412066000000","1064350106","BTC/USD","binance","43234.430000",".002160" +"2021-09-22","1632326412066000000","1064350107","BTC/USD","binance","43236.720000",".001040" +"2021-09-22","1632326412085000000","1064350108","BTC/USD","binance","43234.160000",".005770" +"2021-09-22","1632326412099000000","1064350109","BTC/USD","binance","43234.150000",".016680" +"2021-09-22","1632326412142000000","214133127","BTC/USD","coinbase-pro","43249.320000",".002232" +"2021-09-22","1632326412205000000","214133128","BTC/USD","coinbase-pro","43249.320000",".095200" +"2021-09-22","1632326412174000000","1064350110","BTC/USD","binance","43234.160000",".000230" +"2021-09-22","1632326412174000000","1064350111","BTC/USD","binance","43234.160000",".042950" +"2021-09-22","1632326412221000000","1064350112","BTC/USD","binance","43234.160000",".002310" +"2021-09-22","1632326412228000000","1064350113","BTC/USD","binance","43234.160000",".002340" +"2021-09-22","1632326412289000000","1064350114","BTC/USD","binance","43230.780000",".003900" +"2021-09-22","1632326412296000000","1064350115","BTC/USD","binance","43230.770000",".016680" +"2021-09-22","1632326412349000000","297608807","ETH/BTC","binance",".069600",".801800" +"2021-09-22","1632326412404000000","214133129","BTC/USD","coinbase-pro","43249.320000",".000050" +"2021-09-22","1632326412404000000","214133130","BTC/USD","coinbase-pro","43249.320000",".003759" +"2021-09-22","1632326412404000000","214133131","BTC/USD","coinbase-pro","43249.320000",".115619" +"2021-09-22","1632326412404000000","214133132","BTC/USD","coinbase-pro","43249.330000",".003759" +"2021-09-22","1632326412404000000","214133133","BTC/USD","coinbase-pro","43250.750000",".304303" +"2021-09-22","1632326412359000000","1064350116","BTC/USD","binance","43229.450000",".016680" +"2021-09-22","1632326412404000000","214133134","BTC/USD","coinbase-pro","43251.690000",".105009" +"2021-09-22","1632326412360000000","609511673","ETH/USD","binance","3009.420000",".796500" +"2021-09-22","1632326412360000000","609511674","ETH/USD","binance","3009.420000",".005300" +"2021-09-22","1632326412360000000","1064350117","BTC/USD","binance","43233.170000",".055800" +"2021-09-22","1632326412414000000","1064350118","BTC/USD","binance","43229.460000",".001170" +"2021-09-22","1632326412000000000","59114904516","ETH/USD","gemini","3011.550000",".204000" +"2021-09-22","1632326412000000000","59114904518","ETH/USD","gemini","3011.830000",".060712" +"2021-09-22","1632326412479000000","609511675","ETH/USD","binance","3009.430000",".006600" +"2021-09-22","1632326412480000000","609511676","ETH/USD","binance","3009.430000","1.477100" +"2021-09-22","1632326412480000000","609511677","ETH/USD","binance","3009.430000",".344500" +"2021-09-22","1632326412488000000","1064350119","BTC/USD","binance","43230.000000",".000450" +"2021-09-22","1632326412498000000","1064350120","BTC/USD","binance","43229.990000",".000600" +"2021-09-22","1632326412499000000","609511678","ETH/USD","binance","3009.600000",".259000" +"2021-09-22","1632326412554000000","158876550","ETH/USD","coinbase-pro","3011.240000","1.574570" +"2021-09-22","1632326412559000000","158876551","ETH/USD","coinbase-pro","3011.310000",".157507" +"2021-09-22","1632326412559000000","158876552","ETH/USD","coinbase-pro","3011.320000",".005362" +"2021-09-22","1632326412574000000","158876553","ETH/USD","coinbase-pro","3011.320000",".152145" +"2021-09-22","1632326412542000000","297608808","ETH/BTC","binance",".069610",".338300" +"2021-09-22","1632326412544000000","609511679","ETH/USD","binance","3009.600000","1.341000" +"2021-09-22","1632326412594000000","214133135","BTC/USD","coinbase-pro","43251.690000",".000207" +"2021-09-22","1632326412552000000","609511680","ETH/USD","binance","3009.590000",".338300" +"2021-09-22","1632326412552000000","1064350121","BTC/USD","binance","43230.000000",".023540" +"2021-09-22","1632326412588000000","1064350122","BTC/USD","binance","43230.000000",".000010" +"2021-09-22","1632326412618000000","609511681","ETH/USD","binance","3009.600000",".108800" +"2021-09-22","1632326412619000000","609511682","ETH/USD","binance","3009.870000",".332400" +"2021-09-22","1632326412717000000","214133136","BTC/USD","coinbase-pro","43251.690000",".002213" +"2021-09-22","1632326412728000000","1064350123","BTC/USD","binance","43234.160000",".004910" +"2021-09-22","1632326412749000000","1064350124","BTC/USD","binance","43234.160000",".001150" +"2021-09-22","1632326412000000000","837177433","BTC/USD","bitfinex","43236.000000",".000628" +"2021-09-22","1632326412000000000","837177434","BTC/USD","bitfinex","43236.000000",".199372" +"2021-09-22","1632326412763000000","1064350125","BTC/USD","binance","43234.160000",".000700" +"2021-09-22","1632326412763000000","1064350126","BTC/USD","binance","43237.240000",".005320" +"2021-09-22","1632326412811000000","609511683","ETH/USD","binance","3009.630000","2.400000" +"2021-09-22","1632326412811000000","609511684","ETH/USD","binance","3009.740000","4.000000" +"2021-09-22","1632326412811000000","609511685","ETH/USD","binance","3009.870000","1.267600" +"2021-09-22","1632326412918000000","214133137","BTC/USD","coinbase-pro","43251.690000",".050000" +"2021-09-22","1632326412985000000","158876554","ETH/USD","coinbase-pro","3011.770000",".006608" +"2021-09-22","1632326412932000000","1064350127","BTC/USD","binance","43236.810000",".005150" +"2021-09-22","1632326412955000000","1064350128","BTC/USD","binance","43236.810000",".014800" +"2021-09-22","1632326412960000000","1064350129","BTC/USD","binance","43236.810000",".006680" +"2021-09-22","1632326413023000000","609511686","ETH/USD","binance","3010.000000",".590600" +"2021-09-22","1632326413063000000","297608809","ETH/BTC","binance",".069610",".110700" +"2021-09-22","1632326413000000000","837177435","BTC/USD","bitfinex","43236.000000",".004790" +"2021-09-22","1632326413141000000","609511687","ETH/USD","binance","3010.000000",".044900" +"2021-09-22","1632326413213000000","214133138","BTC/USD","coinbase-pro","43251.690000",".000269" +"2021-09-22","1632326413163000000","609511688","ETH/USD","binance","3010.010000",".131200" +"2021-09-22","1632326413251000000","1064350130","BTC/USD","binance","43236.810000",".000510" +"2021-09-22","1632326413264000000","609511689","ETH/USD","binance","3010.000000",".005000" +"2021-09-22","1632326413288000000","609511690","ETH/USD","binance","3010.000000",".022600" +"2021-09-22","1632326413317000000","297608810","ETH/BTC","binance",".069611",".096100" +"2021-09-22","1632326413384000000","13128764","DOGE/USD","coinbase-pro",".215400","62.400000" +"2021-09-22","1632326413424000000","214133139","BTC/USD","coinbase-pro","43251.690000",".062487" +"2021-09-22","1632326413424000000","214133140","BTC/USD","coinbase-pro","43251.690000",".001730" +"2021-09-22","1632326413424000000","214133141","BTC/USD","coinbase-pro","43251.690000",".003759" +"2021-09-22","1632326413424000000","214133142","BTC/USD","coinbase-pro","43251.690000",".115624" diff --git a/postgres/src/gen_test_data.sql b/postgres/src/gen_test_data.sql new file mode 100644 index 00000000000..9c336398bb0 --- /dev/null +++ b/postgres/src/gen_test_data.sql @@ -0,0 +1,14 @@ +CREATE TABLE CRYPTO_TRADES( + T_DATE date, + T_TS bigint, + T_ID varchar(20), + T_INSTRUMENT varchar(10), + T_EXCHANGE text, + T_PRICE double precision, + T_SIZE real +); + +COPY CRYPTO_TRADES +FROM '/tmp/psql_data/crypto_trades.csv' +DELIMITER ',' +CSV HEADER; diff --git a/py/server/deephaven/dbc/__init__.py b/py/server/deephaven/dbc/__init__.py new file mode 100644 index 00000000000..5f3c8d15418 --- /dev/null +++ b/py/server/deephaven/dbc/__init__.py @@ -0,0 +1,5 @@ +# +# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending +# + +"""The dbc package includes the modules for using external databases with Deephaven.""" diff --git a/py/server/deephaven/dbc/adbc.py b/py/server/deephaven/dbc/adbc.py new file mode 100644 index 00000000000..b14e5fc5d49 --- /dev/null +++ b/py/server/deephaven/dbc/adbc.py @@ -0,0 +1,47 @@ +# +# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending +# +"""This module supports ingesting data from external databases (relational and other types) into Deephaven via the +Python DB-API 2.0 (PEP 249) and the Apache Arrow Database Connectivity (ADBC) interfaces. (Please refer to +https://arrow.apache.org/docs/dev/format/ADBC.html for more details on ADBC). + +ADBC defines a standard API to fetch data in Arrow format from databases that support Arrow natively as well as +from databases that only support ODBC/JDBC. By relying on ADBC, Deephaven is able to ingest data efficiently from a +wide variety of data sources. """ + +from typing import Any + +from deephaven.table import Table +from deephaven import arrow as dharrow +from deephaven import DHError + +try: + import adbc_driver_manager.dbapi +except ImportError: + raise DHError(message="import ADBC driver manager failed, please install ADBC driver manager and your " + "targeted database driver first.") + + +def read_cursor(cursor: adbc_driver_manager.dbapi.Cursor) -> Table: + """Converts the result set of the provided cursor into a Deephaven table. + + Args: + cursor (Any): an ADBC DB-API cursor. Prior to it being passed in, its execute() method must be called to + run a query operation that produces an Arrow table + + Returns: + a new Table + + Raises: + DHError, TypeError + """ + + if not isinstance(cursor, adbc_driver_manager.dbapi.Cursor): + raise TypeError(f"expect {adbc_driver_manager.dbapi.Cursor} got {type(cursor)} instead.") + + try: + pa_table = cursor.fetch_arrow_table() + except Exception as e: + raise DHError(e, message="failed to fetch ADBC result as an Arrow table.") from e + + return dharrow.to_table(pa_table) diff --git a/py/server/deephaven/dbc/odbc.py b/py/server/deephaven/dbc/odbc.py new file mode 100644 index 00000000000..1de6b262f1f --- /dev/null +++ b/py/server/deephaven/dbc/odbc.py @@ -0,0 +1,45 @@ +# +# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending +# +"""This module supports ingesting data from external relational databases into Deephaven via the Python DB-API 2.0 +(PEP 249) and the Open Database Connectivity (ODBC) interfaces by using the Turbodbc module. + +Turbodbc is DB-API 2.0 compliant, provides access to relational databases via the ODBC interface and more +importantly it has optimized, built-in Apache Arrow support when fetching ODBC result sets. This enables Deephaven to +achieve maximum efficiency when ingesting relational data. """ + +from typing import Any + +from deephaven import DHError +from deephaven import arrow as dharrow +from deephaven.table import Table + +try: + import turbodbc.cursor +except ImportError: + raise DHError(message="import turbodbc failed, please install turbodbc, the ODBC driver manager, and the targeted " + "database driver first.") + + +def read_cursor(cursor: turbodbc.cursor.Cursor) -> Table: + """Converts the result set of the provided cursor into a Deephaven table. + + Args: + cursor (Any): a Turbodbc cursor. Prior to it being passed in, its execute() method must be called to run a query + operation that produces a result set + + Returns: + a new Table + + Raises: + DHError, TypeError + """ + + if not isinstance(cursor, turbodbc.cursor.Cursor): + raise TypeError(f"expect {turbodbc.cursor.Cursor} got {type(cursor)} instead.") + try: + pa_table = cursor.fetchallarrow() + except Exception as e: + raise DHError(e, message="failed to fetch ODBC result as a Arrow table.") from e + + return dharrow.to_table(pa_table) diff --git a/py/server/tests/test_dbc.py b/py/server/tests/test_dbc.py new file mode 100644 index 00000000000..f2fe429e819 --- /dev/null +++ b/py/server/tests/test_dbc.py @@ -0,0 +1,37 @@ +# +# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending +# +import unittest + +import turbodbc + +from deephaven.dbc import odbc as dhodbc, adbc as dhadbc +from tests.testbase import BaseTestCase + + +# noinspection SqlDialectInspection +class DbcTestCase(BaseTestCase): + + def test_read_odbc(self): + connection_string = 'Driver={PostgreSQL};Server=postgres;Port=5432;Database=test;Uid=test;Pwd=test;' + with turbodbc.connect(connection_string=connection_string) as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT t_ts, t_id, t_instrument, t_exchange, t_price, t_size FROM CRYPTO_TRADES") + table = dhodbc.read_cursor(cursor) + self.assertEqual(table.size, cursor.rowcount) + + def test_read_adbc(self): + import adbc_driver_postgresql.dbapi + + uri = "postgresql://postgres:5432/test?user=test&password=test" + with adbc_driver_postgresql.dbapi.connect(uri) as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT t_ts, t_exchange, t_price, t_size FROM CRYPTO_TRADES LIMIT 10") + table = dhadbc.read_cursor(cursor) + # This is not ideal but ADBC might have a bug regarding cursor.rowcount it currently returns -1 + # instead of the actual size + self.assertEqual(table.size, 10) + + +if __name__ == '__main__': + unittest.main()