Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relational data ingestion via ADBC/ODBC #3279

Merged
merged 7 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Integrations/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,12 @@ def runInDocker = { String name, String sourcePath, List<String> command, Closur
// set up the container, env vars - things that aren't likely to change
from 'deephaven/server-netty:local-build'
runCommand '''set -eux; \\
pip3 install pyarrow unittest-xml-reporting==3.0.4;\\
mkdir -p /out/report'''
pip3 install unittest-xml-reporting==3.0.4; \\
mkdir -p /out/report; \\
echo "[PostgreSQL]" > /etc/odbcinst.ini; \\
echo "Driver = /usr/lib/x86_64-linux-gnu/odbc/psqlodbcw.so" >> /etc/odbcinst.ini; \\
echo "Threading = 2" >> /etc/odbcinst.ini'''

volume '/data'
volume '/cache'
environmentVariable 'DEEPHAVEN_CLASSPATH', '/classpath/*:/classpath:/opt/deephaven/server/lib/*:/opt/deephaven/server/lib/'
Expand Down
7 changes: 7 additions & 0 deletions Integrations/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,10 @@ services:
expose:
- 29092
- 8081

postgres:
extends:
file: ../postgres/docker-compose.yml
service: postgres
expose:
- 5432
2 changes: 1 addition & 1 deletion docker/registry/server-base/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
io.deephaven.project.ProjectType=DOCKER_REGISTRY
deephaven.registry.imageName=ghcr.io/deephaven/server-base:edge
deephaven.registry.imageId=ghcr.io/deephaven/server-base@sha256:c9413a0b02287ccd16dd586c67fa42f526e68011d879795f5abd104c76ce4b42
deephaven.registry.imageId=ghcr.io/deephaven/server-base@sha256:851d9ca940b9c3d779cbed459ed83d1866975a206b6fbdd364e41b63582ec5e6
2 changes: 1 addition & 1 deletion docker/registry/slim-base/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
io.deephaven.project.ProjectType=DOCKER_REGISTRY
deephaven.registry.imageName=ghcr.io/deephaven/server-slim-base:edge
deephaven.registry.imageId=ghcr.io/deephaven/server-slim-base@sha256:9db84913893f373158244327cb1c0ea0b1085908bfccf13ca679706585923525
deephaven.registry.imageId=ghcr.io/deephaven/server-slim-base@sha256:5b79fe44fd5352a89a1291b9d886325037f85d089787b648e6cb7f3d95d7a0df
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ numba==0.56.4
numpy==1.23.5
pandas==1.5.3
parso==0.8.3
pyarrow==10.0.1
pyarrow==11.0.0
python-dateutil==2.8.2
pytz==2022.7.1
six==1.16.0
2 changes: 1 addition & 1 deletion docker/server/src/main/server-netty/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ numba==0.56.4
numpy==1.23.5
pandas==1.5.3
parso==0.8.3
pyarrow==10.0.1
pyarrow==11.0.0
python-dateutil==2.8.2
pytz==2022.7.1
six==1.16.0
13 changes: 13 additions & 0 deletions postgres/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: "3.4"

services:
postgres:
build:
context: src
hostname: postgres
ports:
- 5432:5432
environment:
- POSTGRES_PASSWORD=test
- POSTGRES_USER=test
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
command: postgres -c fsync=off -c synchronous_commit=off
5 changes: 5 additions & 0 deletions postgres/src/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM postgres:latest
RUN mkdir -p /tmp/psql_data/

COPY crypto_trades.csv /tmp/psql_data/
COPY gen_test_data.sql /docker-entrypoint-initdb.d/
88 changes: 88 additions & 0 deletions postgres/src/crypto_trades.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"Date", "Timestamp", "Id", "Instrument", "Exchange", "Price", "Size"
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
"2021-09-22","1632326411780000000","609511672","ETH/USD","binance","3009.420000",".006300"
"2021-09-22","1632326411783000000","1064350098","BTC/USD","binance","43229.460000",".000430"
"2021-09-22","1632326411858000000","158876549","ETH/USD","coinbase-pro","3011.110000",".003305"
"2021-09-22","1632326411918000000","1064350099","BTC/USD","binance","43229.450000",".001290"
"2021-09-22","1632326411933000000","199242450","BTC/USD","bitstamp","43259.000000",".020000"
"2021-09-22","1632326411949000000","1064350100","BTC/USD","binance","43229.460000",".000010"
"2021-09-22","1632326411949000000","1064350101","BTC/USD","binance","43234.000000",".005220"
"2021-09-22","1632326411963000000","1064350102","BTC/USD","binance","43229.460000",".016680"
"2021-09-22","1632326411977000000","1064350103","BTC/USD","binance","43234.160000",".000580"
"2021-09-22","1632326412030000000","214133126","BTC/USD","coinbase-pro","43249.320000",".018136"
"2021-09-22","1632326412023000000","297608806","ETH/BTC","binance",".069603",".097100"
"2021-09-22","1632326412037000000","1064350104","BTC/USD","binance","43234.160000",".004260"
"2021-09-22","1632326412066000000","1064350105","BTC/USD","binance","43234.160000",".000010"
"2021-09-22","1632326412066000000","1064350106","BTC/USD","binance","43234.430000",".002160"
"2021-09-22","1632326412066000000","1064350107","BTC/USD","binance","43236.720000",".001040"
"2021-09-22","1632326412085000000","1064350108","BTC/USD","binance","43234.160000",".005770"
"2021-09-22","1632326412099000000","1064350109","BTC/USD","binance","43234.150000",".016680"
"2021-09-22","1632326412142000000","214133127","BTC/USD","coinbase-pro","43249.320000",".002232"
"2021-09-22","1632326412205000000","214133128","BTC/USD","coinbase-pro","43249.320000",".095200"
"2021-09-22","1632326412174000000","1064350110","BTC/USD","binance","43234.160000",".000230"
"2021-09-22","1632326412174000000","1064350111","BTC/USD","binance","43234.160000",".042950"
"2021-09-22","1632326412221000000","1064350112","BTC/USD","binance","43234.160000",".002310"
"2021-09-22","1632326412228000000","1064350113","BTC/USD","binance","43234.160000",".002340"
"2021-09-22","1632326412289000000","1064350114","BTC/USD","binance","43230.780000",".003900"
"2021-09-22","1632326412296000000","1064350115","BTC/USD","binance","43230.770000",".016680"
"2021-09-22","1632326412349000000","297608807","ETH/BTC","binance",".069600",".801800"
"2021-09-22","1632326412404000000","214133129","BTC/USD","coinbase-pro","43249.320000",".000050"
"2021-09-22","1632326412404000000","214133130","BTC/USD","coinbase-pro","43249.320000",".003759"
"2021-09-22","1632326412404000000","214133131","BTC/USD","coinbase-pro","43249.320000",".115619"
"2021-09-22","1632326412404000000","214133132","BTC/USD","coinbase-pro","43249.330000",".003759"
"2021-09-22","1632326412404000000","214133133","BTC/USD","coinbase-pro","43250.750000",".304303"
"2021-09-22","1632326412359000000","1064350116","BTC/USD","binance","43229.450000",".016680"
"2021-09-22","1632326412404000000","214133134","BTC/USD","coinbase-pro","43251.690000",".105009"
"2021-09-22","1632326412360000000","609511673","ETH/USD","binance","3009.420000",".796500"
"2021-09-22","1632326412360000000","609511674","ETH/USD","binance","3009.420000",".005300"
"2021-09-22","1632326412360000000","1064350117","BTC/USD","binance","43233.170000",".055800"
"2021-09-22","1632326412414000000","1064350118","BTC/USD","binance","43229.460000",".001170"
"2021-09-22","1632326412000000000","59114904516","ETH/USD","gemini","3011.550000",".204000"
"2021-09-22","1632326412000000000","59114904518","ETH/USD","gemini","3011.830000",".060712"
"2021-09-22","1632326412479000000","609511675","ETH/USD","binance","3009.430000",".006600"
"2021-09-22","1632326412480000000","609511676","ETH/USD","binance","3009.430000","1.477100"
"2021-09-22","1632326412480000000","609511677","ETH/USD","binance","3009.430000",".344500"
"2021-09-22","1632326412488000000","1064350119","BTC/USD","binance","43230.000000",".000450"
"2021-09-22","1632326412498000000","1064350120","BTC/USD","binance","43229.990000",".000600"
"2021-09-22","1632326412499000000","609511678","ETH/USD","binance","3009.600000",".259000"
"2021-09-22","1632326412554000000","158876550","ETH/USD","coinbase-pro","3011.240000","1.574570"
"2021-09-22","1632326412559000000","158876551","ETH/USD","coinbase-pro","3011.310000",".157507"
"2021-09-22","1632326412559000000","158876552","ETH/USD","coinbase-pro","3011.320000",".005362"
"2021-09-22","1632326412574000000","158876553","ETH/USD","coinbase-pro","3011.320000",".152145"
"2021-09-22","1632326412542000000","297608808","ETH/BTC","binance",".069610",".338300"
"2021-09-22","1632326412544000000","609511679","ETH/USD","binance","3009.600000","1.341000"
"2021-09-22","1632326412594000000","214133135","BTC/USD","coinbase-pro","43251.690000",".000207"
"2021-09-22","1632326412552000000","609511680","ETH/USD","binance","3009.590000",".338300"
"2021-09-22","1632326412552000000","1064350121","BTC/USD","binance","43230.000000",".023540"
"2021-09-22","1632326412588000000","1064350122","BTC/USD","binance","43230.000000",".000010"
"2021-09-22","1632326412618000000","609511681","ETH/USD","binance","3009.600000",".108800"
"2021-09-22","1632326412619000000","609511682","ETH/USD","binance","3009.870000",".332400"
"2021-09-22","1632326412717000000","214133136","BTC/USD","coinbase-pro","43251.690000",".002213"
"2021-09-22","1632326412728000000","1064350123","BTC/USD","binance","43234.160000",".004910"
"2021-09-22","1632326412749000000","1064350124","BTC/USD","binance","43234.160000",".001150"
"2021-09-22","1632326412000000000","837177433","BTC/USD","bitfinex","43236.000000",".000628"
"2021-09-22","1632326412000000000","837177434","BTC/USD","bitfinex","43236.000000",".199372"
"2021-09-22","1632326412763000000","1064350125","BTC/USD","binance","43234.160000",".000700"
"2021-09-22","1632326412763000000","1064350126","BTC/USD","binance","43237.240000",".005320"
"2021-09-22","1632326412811000000","609511683","ETH/USD","binance","3009.630000","2.400000"
"2021-09-22","1632326412811000000","609511684","ETH/USD","binance","3009.740000","4.000000"
"2021-09-22","1632326412811000000","609511685","ETH/USD","binance","3009.870000","1.267600"
"2021-09-22","1632326412918000000","214133137","BTC/USD","coinbase-pro","43251.690000",".050000"
"2021-09-22","1632326412985000000","158876554","ETH/USD","coinbase-pro","3011.770000",".006608"
"2021-09-22","1632326412932000000","1064350127","BTC/USD","binance","43236.810000",".005150"
"2021-09-22","1632326412955000000","1064350128","BTC/USD","binance","43236.810000",".014800"
"2021-09-22","1632326412960000000","1064350129","BTC/USD","binance","43236.810000",".006680"
"2021-09-22","1632326413023000000","609511686","ETH/USD","binance","3010.000000",".590600"
"2021-09-22","1632326413063000000","297608809","ETH/BTC","binance",".069610",".110700"
"2021-09-22","1632326413000000000","837177435","BTC/USD","bitfinex","43236.000000",".004790"
"2021-09-22","1632326413141000000","609511687","ETH/USD","binance","3010.000000",".044900"
"2021-09-22","1632326413213000000","214133138","BTC/USD","coinbase-pro","43251.690000",".000269"
"2021-09-22","1632326413163000000","609511688","ETH/USD","binance","3010.010000",".131200"
"2021-09-22","1632326413251000000","1064350130","BTC/USD","binance","43236.810000",".000510"
"2021-09-22","1632326413264000000","609511689","ETH/USD","binance","3010.000000",".005000"
"2021-09-22","1632326413288000000","609511690","ETH/USD","binance","3010.000000",".022600"
"2021-09-22","1632326413317000000","297608810","ETH/BTC","binance",".069611",".096100"
"2021-09-22","1632326413384000000","13128764","DOGE/USD","coinbase-pro",".215400","62.400000"
"2021-09-22","1632326413424000000","214133139","BTC/USD","coinbase-pro","43251.690000",".062487"
"2021-09-22","1632326413424000000","214133140","BTC/USD","coinbase-pro","43251.690000",".001730"
"2021-09-22","1632326413424000000","214133141","BTC/USD","coinbase-pro","43251.690000",".003759"
"2021-09-22","1632326413424000000","214133142","BTC/USD","coinbase-pro","43251.690000",".115624"
14 changes: 14 additions & 0 deletions postgres/src/gen_test_data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE CRYPTO_TRADES(
T_DATE date,
T_TS bigint,
T_ID varchar(20),
T_INSTRUMENT varchar(10),
T_EXCHANGE text,
T_PRICE double precision,
T_SIZE real
);

COPY CRYPTO_TRADES
FROM '/tmp/psql_data/crypto_trades.csv'
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
DELIMITER ','
CSV HEADER;
5 changes: 5 additions & 0 deletions py/server/deephaven/dbc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved

"""The dbc package includes the modules for using external databases with Deephaven."""
47 changes: 47 additions & 0 deletions py/server/deephaven/dbc/adbc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
"""This module supports ingesting data from external databases (relational and other types) into Deephaven via the
Python DB-API 2.0 (PEP 249) and the Apache Arrow Database Connectivity (ADBC) interfaces. (Please refer to
https://arrow.apache.org/docs/dev/format/ADBC.html for more details on ADBC).

ADBC defines a standard API to fetch data in Arrow format from databases that support Arrow natively as well as
from databases that only support ODBC/JDBC. By relying on ADBC, Deephaven is able to ingest data efficiently from a
wide variety of data sources. """

from typing import Any

from deephaven.table import Table
from deephaven import arrow as dharrow
from deephaven import DHError

try:
import adbc_driver_manager.dbapi
except ImportError:
raise DHError(message="import ADBC driver manager failed, please install ADBC driver manager and your "
"targeted database driver first.")


def read_cursor(cursor: adbc_driver_manager.dbapi.Cursor) -> Table:
"""Converts the result set of the provided cursor into a Deephaven table.

Args:
cursor (Any): an ADBC DB-API cursor. Prior to it being passed in, its execute() method must be called to
run a query operation that produces an Arrow table

Returns:
a new Table

Raises:
DHError, TypeError
"""

if not isinstance(cursor, adbc_driver_manager.dbapi.Cursor):
raise TypeError(f"expect {adbc_driver_manager.dbapi.Cursor} got {type(cursor)} instead.")

try:
pa_table = cursor.fetch_arrow_table()
except Exception as e:
raise DHError(e, message="failed to fetch ADBC result as an Arrow table.") from e

return dharrow.to_table(pa_table)
45 changes: 45 additions & 0 deletions py/server/deephaven/dbc/odbc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
"""This module supports ingesting data from external relational databases into Deephaven via the Python DB-API 2.0
(PEP 249) and the Open Database Connectivity (ODBC) interfaces by using the Turbodbc module.

Turbodbc is DB-API 2.0 compliant, provides access to relational databases via the ODBC interface and more
importantly it has optimized, built-in Apache Arrow support when fetching ODBC result sets. This enables Deephaven to
achieve maximum efficiency when ingesting relational data. """

from typing import Any

from deephaven import DHError
from deephaven import arrow as dharrow
from deephaven.table import Table

try:
import turbodbc.cursor
except ImportError:
raise DHError(message="import turbodbc failed, please install turbodbc, the ODBC driver manager, and the targeted "
"database driver first.")


def read_cursor(cursor: turbodbc.cursor.Cursor) -> Table:
"""Converts the result set of the provided cursor into a Deephaven table.

Args:
cursor (Any): a Turbodbc cursor. Prior to it being passed in, its execute() method must be called to run a query
operation that produces a result set

Returns:
a new Table

Raises:
DHError, TypeError
"""

if not isinstance(cursor, turbodbc.cursor.Cursor):
raise TypeError(f"expect {turbodbc.cursor.Cursor} got {type(cursor)} instead.")
try:
pa_table = cursor.fetchallarrow()
except Exception as e:
raise DHError(e, message="failed to fetch ODBC result as a Arrow table.") from e

return dharrow.to_table(pa_table)
37 changes: 37 additions & 0 deletions py/server/tests/test_dbc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
import unittest

import turbodbc

from deephaven.dbc import odbc as dhodbc, adbc as dhadbc
from tests.testbase import BaseTestCase


# noinspection SqlDialectInspection
class DbcTestCase(BaseTestCase):

def test_read_odbc(self):
connection_string = 'Driver={PostgreSQL};Server=postgres;Port=5432;Database=test;Uid=test;Pwd=test;'
with turbodbc.connect(connection_string=connection_string) as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT t_ts, t_id, t_instrument, t_exchange, t_price, t_size FROM CRYPTO_TRADES")
table = dhodbc.read_cursor(cursor)
self.assertEqual(table.size, cursor.rowcount)

def test_read_adbc(self):
import adbc_driver_postgresql.dbapi

uri = "postgresql://postgres:5432/test?user=test&password=test"
with adbc_driver_postgresql.dbapi.connect(uri) as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT t_ts, t_exchange, t_price, t_size FROM CRYPTO_TRADES LIMIT 10")
table = dhadbc.read_cursor(cursor)
# This is not ideal but ADBC might have a bug regarding cursor.rowcount it currently returns -1
# instead of the actual size
self.assertEqual(table.size, 10)


if __name__ == '__main__':
unittest.main()