-
Notifications
You must be signed in to change notification settings - Fork 81
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Relational data ingestion via ADBC/ODBC (#3279)
* Add test env setup (Postgres, ODBC) * Respond to review comments and test env changes * Provide more helpful error messages Remove unwanted comment * Switch to turbodbc-installed server base images * Apply suggestions from code review Co-authored-by: Chip Kent <[email protected]> * Use latest server base images with arrow-11 * Reduce the test data size in repo --------- Co-authored-by: Chip Kent <[email protected]>
- Loading branch information
1 parent
c0ff305
commit 663f093
Showing
14 changed files
with
271 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,3 @@ | ||
io.deephaven.project.ProjectType=DOCKER_REGISTRY | ||
deephaven.registry.imageName=ghcr.io/deephaven/server-base:edge | ||
deephaven.registry.imageId=ghcr.io/deephaven/server-base@sha256:c9413a0b02287ccd16dd586c67fa42f526e68011d879795f5abd104c76ce4b42 | ||
deephaven.registry.imageId=ghcr.io/deephaven/server-base@sha256:851d9ca940b9c3d779cbed459ed83d1866975a206b6fbdd364e41b63582ec5e6 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,3 @@ | ||
io.deephaven.project.ProjectType=DOCKER_REGISTRY | ||
deephaven.registry.imageName=ghcr.io/deephaven/server-slim-base:edge | ||
deephaven.registry.imageId=ghcr.io/deephaven/server-slim-base@sha256:9db84913893f373158244327cb1c0ea0b1085908bfccf13ca679706585923525 | ||
deephaven.registry.imageId=ghcr.io/deephaven/server-slim-base@sha256:5b79fe44fd5352a89a1291b9d886325037f85d089787b648e6cb7f3d95d7a0df |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
version: "3.4" | ||
|
||
services: | ||
postgres: | ||
build: | ||
context: src | ||
hostname: postgres | ||
ports: | ||
- 5432:5432 | ||
environment: | ||
- POSTGRES_PASSWORD=test | ||
- POSTGRES_USER=test | ||
command: postgres -c fsync=off -c synchronous_commit=off |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
FROM postgres:latest | ||
RUN mkdir -p /tmp/psql_data/ | ||
|
||
COPY crypto_trades.csv /tmp/psql_data/ | ||
COPY gen_test_data.sql /docker-entrypoint-initdb.d/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
"Date", "Timestamp", "Id", "Instrument", "Exchange", "Price", "Size" | ||
"2021-09-22","1632326411780000000","609511672","ETH/USD","binance","3009.420000",".006300" | ||
"2021-09-22","1632326411783000000","1064350098","BTC/USD","binance","43229.460000",".000430" | ||
"2021-09-22","1632326411858000000","158876549","ETH/USD","coinbase-pro","3011.110000",".003305" | ||
"2021-09-22","1632326411918000000","1064350099","BTC/USD","binance","43229.450000",".001290" | ||
"2021-09-22","1632326411933000000","199242450","BTC/USD","bitstamp","43259.000000",".020000" | ||
"2021-09-22","1632326411949000000","1064350100","BTC/USD","binance","43229.460000",".000010" | ||
"2021-09-22","1632326411949000000","1064350101","BTC/USD","binance","43234.000000",".005220" | ||
"2021-09-22","1632326411963000000","1064350102","BTC/USD","binance","43229.460000",".016680" | ||
"2021-09-22","1632326411977000000","1064350103","BTC/USD","binance","43234.160000",".000580" | ||
"2021-09-22","1632326412030000000","214133126","BTC/USD","coinbase-pro","43249.320000",".018136" | ||
"2021-09-22","1632326412023000000","297608806","ETH/BTC","binance",".069603",".097100" | ||
"2021-09-22","1632326412037000000","1064350104","BTC/USD","binance","43234.160000",".004260" | ||
"2021-09-22","1632326412066000000","1064350105","BTC/USD","binance","43234.160000",".000010" | ||
"2021-09-22","1632326412066000000","1064350106","BTC/USD","binance","43234.430000",".002160" | ||
"2021-09-22","1632326412066000000","1064350107","BTC/USD","binance","43236.720000",".001040" | ||
"2021-09-22","1632326412085000000","1064350108","BTC/USD","binance","43234.160000",".005770" | ||
"2021-09-22","1632326412099000000","1064350109","BTC/USD","binance","43234.150000",".016680" | ||
"2021-09-22","1632326412142000000","214133127","BTC/USD","coinbase-pro","43249.320000",".002232" | ||
"2021-09-22","1632326412205000000","214133128","BTC/USD","coinbase-pro","43249.320000",".095200" | ||
"2021-09-22","1632326412174000000","1064350110","BTC/USD","binance","43234.160000",".000230" | ||
"2021-09-22","1632326412174000000","1064350111","BTC/USD","binance","43234.160000",".042950" | ||
"2021-09-22","1632326412221000000","1064350112","BTC/USD","binance","43234.160000",".002310" | ||
"2021-09-22","1632326412228000000","1064350113","BTC/USD","binance","43234.160000",".002340" | ||
"2021-09-22","1632326412289000000","1064350114","BTC/USD","binance","43230.780000",".003900" | ||
"2021-09-22","1632326412296000000","1064350115","BTC/USD","binance","43230.770000",".016680" | ||
"2021-09-22","1632326412349000000","297608807","ETH/BTC","binance",".069600",".801800" | ||
"2021-09-22","1632326412404000000","214133129","BTC/USD","coinbase-pro","43249.320000",".000050" | ||
"2021-09-22","1632326412404000000","214133130","BTC/USD","coinbase-pro","43249.320000",".003759" | ||
"2021-09-22","1632326412404000000","214133131","BTC/USD","coinbase-pro","43249.320000",".115619" | ||
"2021-09-22","1632326412404000000","214133132","BTC/USD","coinbase-pro","43249.330000",".003759" | ||
"2021-09-22","1632326412404000000","214133133","BTC/USD","coinbase-pro","43250.750000",".304303" | ||
"2021-09-22","1632326412359000000","1064350116","BTC/USD","binance","43229.450000",".016680" | ||
"2021-09-22","1632326412404000000","214133134","BTC/USD","coinbase-pro","43251.690000",".105009" | ||
"2021-09-22","1632326412360000000","609511673","ETH/USD","binance","3009.420000",".796500" | ||
"2021-09-22","1632326412360000000","609511674","ETH/USD","binance","3009.420000",".005300" | ||
"2021-09-22","1632326412360000000","1064350117","BTC/USD","binance","43233.170000",".055800" | ||
"2021-09-22","1632326412414000000","1064350118","BTC/USD","binance","43229.460000",".001170" | ||
"2021-09-22","1632326412000000000","59114904516","ETH/USD","gemini","3011.550000",".204000" | ||
"2021-09-22","1632326412000000000","59114904518","ETH/USD","gemini","3011.830000",".060712" | ||
"2021-09-22","1632326412479000000","609511675","ETH/USD","binance","3009.430000",".006600" | ||
"2021-09-22","1632326412480000000","609511676","ETH/USD","binance","3009.430000","1.477100" | ||
"2021-09-22","1632326412480000000","609511677","ETH/USD","binance","3009.430000",".344500" | ||
"2021-09-22","1632326412488000000","1064350119","BTC/USD","binance","43230.000000",".000450" | ||
"2021-09-22","1632326412498000000","1064350120","BTC/USD","binance","43229.990000",".000600" | ||
"2021-09-22","1632326412499000000","609511678","ETH/USD","binance","3009.600000",".259000" | ||
"2021-09-22","1632326412554000000","158876550","ETH/USD","coinbase-pro","3011.240000","1.574570" | ||
"2021-09-22","1632326412559000000","158876551","ETH/USD","coinbase-pro","3011.310000",".157507" | ||
"2021-09-22","1632326412559000000","158876552","ETH/USD","coinbase-pro","3011.320000",".005362" | ||
"2021-09-22","1632326412574000000","158876553","ETH/USD","coinbase-pro","3011.320000",".152145" | ||
"2021-09-22","1632326412542000000","297608808","ETH/BTC","binance",".069610",".338300" | ||
"2021-09-22","1632326412544000000","609511679","ETH/USD","binance","3009.600000","1.341000" | ||
"2021-09-22","1632326412594000000","214133135","BTC/USD","coinbase-pro","43251.690000",".000207" | ||
"2021-09-22","1632326412552000000","609511680","ETH/USD","binance","3009.590000",".338300" | ||
"2021-09-22","1632326412552000000","1064350121","BTC/USD","binance","43230.000000",".023540" | ||
"2021-09-22","1632326412588000000","1064350122","BTC/USD","binance","43230.000000",".000010" | ||
"2021-09-22","1632326412618000000","609511681","ETH/USD","binance","3009.600000",".108800" | ||
"2021-09-22","1632326412619000000","609511682","ETH/USD","binance","3009.870000",".332400" | ||
"2021-09-22","1632326412717000000","214133136","BTC/USD","coinbase-pro","43251.690000",".002213" | ||
"2021-09-22","1632326412728000000","1064350123","BTC/USD","binance","43234.160000",".004910" | ||
"2021-09-22","1632326412749000000","1064350124","BTC/USD","binance","43234.160000",".001150" | ||
"2021-09-22","1632326412000000000","837177433","BTC/USD","bitfinex","43236.000000",".000628" | ||
"2021-09-22","1632326412000000000","837177434","BTC/USD","bitfinex","43236.000000",".199372" | ||
"2021-09-22","1632326412763000000","1064350125","BTC/USD","binance","43234.160000",".000700" | ||
"2021-09-22","1632326412763000000","1064350126","BTC/USD","binance","43237.240000",".005320" | ||
"2021-09-22","1632326412811000000","609511683","ETH/USD","binance","3009.630000","2.400000" | ||
"2021-09-22","1632326412811000000","609511684","ETH/USD","binance","3009.740000","4.000000" | ||
"2021-09-22","1632326412811000000","609511685","ETH/USD","binance","3009.870000","1.267600" | ||
"2021-09-22","1632326412918000000","214133137","BTC/USD","coinbase-pro","43251.690000",".050000" | ||
"2021-09-22","1632326412985000000","158876554","ETH/USD","coinbase-pro","3011.770000",".006608" | ||
"2021-09-22","1632326412932000000","1064350127","BTC/USD","binance","43236.810000",".005150" | ||
"2021-09-22","1632326412955000000","1064350128","BTC/USD","binance","43236.810000",".014800" | ||
"2021-09-22","1632326412960000000","1064350129","BTC/USD","binance","43236.810000",".006680" | ||
"2021-09-22","1632326413023000000","609511686","ETH/USD","binance","3010.000000",".590600" | ||
"2021-09-22","1632326413063000000","297608809","ETH/BTC","binance",".069610",".110700" | ||
"2021-09-22","1632326413000000000","837177435","BTC/USD","bitfinex","43236.000000",".004790" | ||
"2021-09-22","1632326413141000000","609511687","ETH/USD","binance","3010.000000",".044900" | ||
"2021-09-22","1632326413213000000","214133138","BTC/USD","coinbase-pro","43251.690000",".000269" | ||
"2021-09-22","1632326413163000000","609511688","ETH/USD","binance","3010.010000",".131200" | ||
"2021-09-22","1632326413251000000","1064350130","BTC/USD","binance","43236.810000",".000510" | ||
"2021-09-22","1632326413264000000","609511689","ETH/USD","binance","3010.000000",".005000" | ||
"2021-09-22","1632326413288000000","609511690","ETH/USD","binance","3010.000000",".022600" | ||
"2021-09-22","1632326413317000000","297608810","ETH/BTC","binance",".069611",".096100" | ||
"2021-09-22","1632326413384000000","13128764","DOGE/USD","coinbase-pro",".215400","62.400000" | ||
"2021-09-22","1632326413424000000","214133139","BTC/USD","coinbase-pro","43251.690000",".062487" | ||
"2021-09-22","1632326413424000000","214133140","BTC/USD","coinbase-pro","43251.690000",".001730" | ||
"2021-09-22","1632326413424000000","214133141","BTC/USD","coinbase-pro","43251.690000",".003759" | ||
"2021-09-22","1632326413424000000","214133142","BTC/USD","coinbase-pro","43251.690000",".115624" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
CREATE TABLE CRYPTO_TRADES( | ||
T_DATE date, | ||
T_TS bigint, | ||
T_ID varchar(20), | ||
T_INSTRUMENT varchar(10), | ||
T_EXCHANGE text, | ||
T_PRICE double precision, | ||
T_SIZE real | ||
); | ||
|
||
COPY CRYPTO_TRADES | ||
FROM '/tmp/psql_data/crypto_trades.csv' | ||
DELIMITER ',' | ||
CSV HEADER; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
# | ||
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending | ||
# | ||
|
||
"""The dbc package includes the modules for using external databases with Deephaven.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
# | ||
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending | ||
# | ||
"""This module supports ingesting data from external databases (relational and other types) into Deephaven via the | ||
Python DB-API 2.0 (PEP 249) and the Apache Arrow Database Connectivity (ADBC) interfaces. (Please refer to | ||
https://arrow.apache.org/docs/dev/format/ADBC.html for more details on ADBC). | ||
ADBC defines a standard API to fetch data in Arrow format from databases that support Arrow natively as well as | ||
from databases that only support ODBC/JDBC. By relying on ADBC, Deephaven is able to ingest data efficiently from a | ||
wide variety of data sources. """ | ||
|
||
from typing import Any | ||
|
||
from deephaven.table import Table | ||
from deephaven import arrow as dharrow | ||
from deephaven import DHError | ||
|
||
try: | ||
import adbc_driver_manager.dbapi | ||
except ImportError: | ||
raise DHError(message="import ADBC driver manager failed, please install ADBC driver manager and your " | ||
"targeted database driver first.") | ||
|
||
|
||
def read_cursor(cursor: adbc_driver_manager.dbapi.Cursor) -> Table: | ||
"""Converts the result set of the provided cursor into a Deephaven table. | ||
Args: | ||
cursor (Any): an ADBC DB-API cursor. Prior to it being passed in, its execute() method must be called to | ||
run a query operation that produces an Arrow table | ||
Returns: | ||
a new Table | ||
Raises: | ||
DHError, TypeError | ||
""" | ||
|
||
if not isinstance(cursor, adbc_driver_manager.dbapi.Cursor): | ||
raise TypeError(f"expect {adbc_driver_manager.dbapi.Cursor} got {type(cursor)} instead.") | ||
|
||
try: | ||
pa_table = cursor.fetch_arrow_table() | ||
except Exception as e: | ||
raise DHError(e, message="failed to fetch ADBC result as an Arrow table.") from e | ||
|
||
return dharrow.to_table(pa_table) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# | ||
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending | ||
# | ||
"""This module supports ingesting data from external relational databases into Deephaven via the Python DB-API 2.0 | ||
(PEP 249) and the Open Database Connectivity (ODBC) interfaces by using the Turbodbc module. | ||
Turbodbc is DB-API 2.0 compliant, provides access to relational databases via the ODBC interface and more | ||
importantly it has optimized, built-in Apache Arrow support when fetching ODBC result sets. This enables Deephaven to | ||
achieve maximum efficiency when ingesting relational data. """ | ||
|
||
from typing import Any | ||
|
||
from deephaven import DHError | ||
from deephaven import arrow as dharrow | ||
from deephaven.table import Table | ||
|
||
try: | ||
import turbodbc.cursor | ||
except ImportError: | ||
raise DHError(message="import turbodbc failed, please install turbodbc, the ODBC driver manager, and the targeted " | ||
"database driver first.") | ||
|
||
|
||
def read_cursor(cursor: turbodbc.cursor.Cursor) -> Table: | ||
"""Converts the result set of the provided cursor into a Deephaven table. | ||
Args: | ||
cursor (Any): a Turbodbc cursor. Prior to it being passed in, its execute() method must be called to run a query | ||
operation that produces a result set | ||
Returns: | ||
a new Table | ||
Raises: | ||
DHError, TypeError | ||
""" | ||
|
||
if not isinstance(cursor, turbodbc.cursor.Cursor): | ||
raise TypeError(f"expect {turbodbc.cursor.Cursor} got {type(cursor)} instead.") | ||
try: | ||
pa_table = cursor.fetchallarrow() | ||
except Exception as e: | ||
raise DHError(e, message="failed to fetch ODBC result as a Arrow table.") from e | ||
|
||
return dharrow.to_table(pa_table) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
# | ||
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending | ||
# | ||
import unittest | ||
|
||
import turbodbc | ||
|
||
from deephaven.dbc import odbc as dhodbc, adbc as dhadbc | ||
from tests.testbase import BaseTestCase | ||
|
||
|
||
# noinspection SqlDialectInspection | ||
class DbcTestCase(BaseTestCase): | ||
|
||
def test_read_odbc(self): | ||
connection_string = 'Driver={PostgreSQL};Server=postgres;Port=5432;Database=test;Uid=test;Pwd=test;' | ||
with turbodbc.connect(connection_string=connection_string) as conn: | ||
with conn.cursor() as cursor: | ||
cursor.execute("SELECT t_ts, t_id, t_instrument, t_exchange, t_price, t_size FROM CRYPTO_TRADES") | ||
table = dhodbc.read_cursor(cursor) | ||
self.assertEqual(table.size, cursor.rowcount) | ||
|
||
def test_read_adbc(self): | ||
import adbc_driver_postgresql.dbapi | ||
|
||
uri = "postgresql://postgres:5432/test?user=test&password=test" | ||
with adbc_driver_postgresql.dbapi.connect(uri) as conn: | ||
with conn.cursor() as cursor: | ||
cursor.execute("SELECT t_ts, t_exchange, t_price, t_size FROM CRYPTO_TRADES LIMIT 10") | ||
table = dhadbc.read_cursor(cursor) | ||
# This is not ideal but ADBC might have a bug regarding cursor.rowcount it currently returns -1 | ||
# instead of the actual size | ||
self.assertEqual(table.size, 10) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |