Skip to content

Commit

Permalink
Update DBAPI to QueryService
Browse files Browse the repository at this point in the history
  • Loading branch information
vgvoleg committed Nov 2, 2024
1 parent 05d9299 commit c158bd1
Show file tree
Hide file tree
Showing 16 changed files with 204 additions and 1,350 deletions.
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
version: "3.3"
services:
ydb:
image: cr.yandex/yc/yandex-docker-local-ydb:trunk
image: ydbplatform/local-ydb:trunk
restart: always
ports:
- "2136:2136"
hostname: localhost
environment:
- YDB_USE_IN_MEMORY_PDISKS=true
- YDB_ENABLE_COLUMN_TABLES=true
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
sqlalchemy >= 2.0.7, < 3.0.0
ydb >= 3.11.3
ydb >= 3.18.8
ydb-dbapi==0.0.1b7
7 changes: 5 additions & 2 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
sqlalchemy==2.0.7
ydb==3.11.3
pyyaml==5.3.1
greenlet

sqlalchemy==2.0.7
ydb >= 3.18.8
ydb-dbapi==0.0.1b7
requests<2.29
pytest==7.2.2
docker==6.0.1
Expand Down
148 changes: 39 additions & 109 deletions test/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,16 @@ def test_sa_text(self, connection):
SELECT x, y FROM AS_TABLE(:data)
"""
),
[{"data": [{"x": 2, "y": 1}, {"x": 3, "y": 2}]}],
[{
"data": ydb.TypedValue(
[{"x": 2, "y": 1}, {"x": 3, "y": 2}],
ydb.ListType(
ydb.StructType()
.add_member('x', ydb.PrimitiveType.Int64)
.add_member('y', ydb.PrimitiveType.Int64)
)
)
}],
)
assert set(rs.fetchall()) == {(2, 1), (3, 2)}

Expand Down Expand Up @@ -454,85 +463,6 @@ def test_several_keys(self, connection, metadata):
assert desc.partitioning_settings.max_partitions_count == 5


class TestScanQuery(TablesTest):
__backend__ = True

@classmethod
def define_tables(cls, metadata: sa.MetaData):
Table(
"test",
metadata,
Column("id", Integer, primary_key=True),
)

@classmethod
def insert_data(cls, connection: sa.Connection):
table = cls.tables.test
for i in range(50):
connection.execute(ydb_sa.upsert(table).values([{"id": i * 1000 + j} for j in range(1000)]))

def test_characteristic(self):
engine = self.bind.execution_options()

with engine.connect() as connection:
default_options = connection.get_execution_options()

with engine.connect() as connection:
connection.execution_options(ydb_scan_query=True)
options_after_set = connection.get_execution_options()

with engine.connect() as connection:
options_after_reset = connection.get_execution_options()

assert "ydb_scan_query" not in default_options
assert options_after_set["ydb_scan_query"]
assert "ydb_scan_query" not in options_after_reset

def test_fetchmany(self, connection_no_trans: sa.Connection):
table = self.tables.test
stmt = sa.select(table).where(table.c.id % 2 == 0)

connection_no_trans.execution_options(ydb_scan_query=True)
cursor = connection_no_trans.execute(stmt)

assert cursor.cursor.use_scan_query
result = cursor.fetchmany(1000) # fetches only the first 5k rows
assert result == [(i,) for i in range(2000) if i % 2 == 0]

def test_fetchall(self, connection_no_trans: sa.Connection):
table = self.tables.test
stmt = sa.select(table).where(table.c.id % 2 == 0)

connection_no_trans.execution_options(ydb_scan_query=True)
cursor = connection_no_trans.execute(stmt)

assert cursor.cursor.use_scan_query
result = cursor.fetchall()
assert result == [(i,) for i in range(50000) if i % 2 == 0]

def test_begin_does_nothing(self, connection_no_trans: sa.Connection):
table = self.tables.test
connection_no_trans.execution_options(ydb_scan_query=True)

with connection_no_trans.begin():
cursor = connection_no_trans.execute(sa.select(table))

assert cursor.cursor.use_scan_query
assert cursor.cursor.tx_context is None

def test_engine_option(self):
table = self.tables.test
engine = self.bind.execution_options(ydb_scan_query=True)

with engine.begin() as connection:
cursor = connection.execute(sa.select(table))
assert cursor.cursor.use_scan_query

with engine.begin() as connection:
cursor = connection.execute(sa.select(table))
assert cursor.cursor.use_scan_query


class TestTransaction(TablesTest):
__backend__ = True

Expand Down Expand Up @@ -585,11 +515,11 @@ def test_interactive_transaction(

connection_no_trans.execution_options(isolation_level=isolation_level)
with connection_no_trans.begin():
tx_id = dbapi_connection.tx_context.tx_id
assert tx_id is not None
cursor1 = connection_no_trans.execute(sa.select(table))
tx_id = dbapi_connection._tx_context.tx_id
assert tx_id is not None
cursor2 = connection_no_trans.execute(sa.select(table))
assert dbapi_connection.tx_context.tx_id == tx_id
assert dbapi_connection._tx_context.tx_id == tx_id

assert set(cursor1.fetchall()) == {(5,), (6,)}
assert set(cursor2.fetchall()) == {(5,), (6,)}
Expand All @@ -614,10 +544,10 @@ def test_not_interactive_transaction(

connection_no_trans.execution_options(isolation_level=isolation_level)
with connection_no_trans.begin():
assert dbapi_connection.tx_context is None
assert dbapi_connection._tx_context is None
cursor1 = connection_no_trans.execute(sa.select(table))
cursor2 = connection_no_trans.execute(sa.select(table))
assert dbapi_connection.tx_context is None
assert dbapi_connection._tx_context is None

assert set(cursor1.fetchall()) == {(7,), (8,)}
assert set(cursor2.fetchall()) == {(7,), (8,)}
Expand All @@ -631,14 +561,14 @@ class IsolationSettings(NamedTuple):
interactive: bool

YDB_ISOLATION_SETTINGS_MAP = {
IsolationLevel.AUTOCOMMIT: IsolationSettings(ydb.SerializableReadWrite().name, False),
IsolationLevel.SERIALIZABLE: IsolationSettings(ydb.SerializableReadWrite().name, True),
IsolationLevel.ONLINE_READONLY: IsolationSettings(ydb.OnlineReadOnly().name, False),
IsolationLevel.AUTOCOMMIT: IsolationSettings(ydb.QuerySerializableReadWrite().name, False),
IsolationLevel.SERIALIZABLE: IsolationSettings(ydb.QuerySerializableReadWrite().name, True),
IsolationLevel.ONLINE_READONLY: IsolationSettings(ydb.QueryOnlineReadOnly().name, False),
IsolationLevel.ONLINE_READONLY_INCONSISTENT: IsolationSettings(
ydb.OnlineReadOnly().with_allow_inconsistent_reads().name, False
ydb.QueryOnlineReadOnly().with_allow_inconsistent_reads().name, False
),
IsolationLevel.STALE_READONLY: IsolationSettings(ydb.StaleReadOnly().name, False),
IsolationLevel.SNAPSHOT_READONLY: IsolationSettings(ydb.SnapshotReadOnly().name, True),
IsolationLevel.STALE_READONLY: IsolationSettings(ydb.QueryStaleReadOnly().name, False),
IsolationLevel.SNAPSHOT_READONLY: IsolationSettings(ydb.QuerySnapshotReadOnly().name, True),
}

def test_connection_set(self, connection_no_trans: sa.Connection):
Expand All @@ -647,13 +577,13 @@ def test_connection_set(self, connection_no_trans: sa.Connection):
for sa_isolation_level, ydb_isolation_settings in self.YDB_ISOLATION_SETTINGS_MAP.items():
connection_no_trans.execution_options(isolation_level=sa_isolation_level)
with connection_no_trans.begin():
assert dbapi_connection.tx_mode.name == ydb_isolation_settings[0]
assert dbapi_connection._tx_mode.name == ydb_isolation_settings[0]
assert dbapi_connection.interactive_transaction is ydb_isolation_settings[1]
if dbapi_connection.interactive_transaction:
assert dbapi_connection.tx_context is not None
assert dbapi_connection.tx_context.tx_id is not None
assert dbapi_connection._tx_context is not None
# assert dbapi_connection._tx_context.tx_id is not None
else:
assert dbapi_connection.tx_context is None
assert dbapi_connection._tx_context is None


class TestEngine(TestBase):
Expand All @@ -674,7 +604,7 @@ def ydb_driver(self):

@pytest.fixture(scope="class")
def ydb_pool(self, ydb_driver):
session_pool = ydb.SessionPool(ydb_driver, size=5, workers_threads_count=1)
session_pool = ydb.QuerySessionPool(ydb_driver, size=5)

try:
yield session_pool
Expand All @@ -689,8 +619,8 @@ def test_sa_queue_pool_with_ydb_shared_session_pool(self, ydb_driver, ydb_pool):
dbapi_conn1: dbapi.Connection = conn1.connection.dbapi_connection
dbapi_conn2: dbapi.Connection = conn2.connection.dbapi_connection

assert dbapi_conn1.session_pool is dbapi_conn2.session_pool
assert dbapi_conn1.driver is dbapi_conn2.driver
assert dbapi_conn1._session_pool is dbapi_conn2._session_pool
assert dbapi_conn1._driver is dbapi_conn2._driver

engine1.dispose()
engine2.dispose()
Expand All @@ -704,14 +634,13 @@ def test_sa_null_pool_with_ydb_shared_session_pool(self, ydb_driver, ydb_pool):
dbapi_conn1: dbapi.Connection = conn1.connection.dbapi_connection
dbapi_conn2: dbapi.Connection = conn2.connection.dbapi_connection

assert dbapi_conn1.session_pool is dbapi_conn2.session_pool
assert dbapi_conn1.driver is dbapi_conn2.driver
assert dbapi_conn1._session_pool is dbapi_conn2._session_pool
assert dbapi_conn1._driver is dbapi_conn2._driver

engine1.dispose()
engine2.dispose()
assert not ydb_driver._stopped


class TestAsyncEngine(TestEngine):
__only_on__ = "yql+ydb_async"

Expand All @@ -726,14 +655,15 @@ def ydb_driver(self):
finally:
loop.run_until_complete(driver.stop())

@pytest.mark.asyncio
@pytest.fixture(scope="class")
def ydb_pool(self, ydb_driver):
session_pool = ydb.aio.SessionPool(ydb_driver, size=5)
loop = asyncio.get_event_loop()
session_pool = ydb.aio.QuerySessionPool(ydb_driver, size=5, loop=loop)

try:
yield session_pool
finally:
loop = asyncio.get_event_loop()
loop.run_until_complete(session_pool.stop())


Expand All @@ -742,9 +672,9 @@ class TestCredentials(TestBase):
__only_on__ = "yql+ydb"

@pytest.fixture(scope="class")
def table_client_settings(self):
def query_client_settings(self):
yield (
ydb.TableClientSettings()
ydb.QueryClientSettings()
.with_native_date_in_result_sets(True)
.with_native_datetime_in_result_sets(True)
.with_native_timestamp_in_result_sets(True)
Expand All @@ -753,18 +683,18 @@ def table_client_settings(self):
)

@pytest.fixture(scope="class")
def driver_config_for_credentials(self, table_client_settings):
def driver_config_for_credentials(self, query_client_settings):
url = config.db_url
endpoint = f"grpc://{url.host}:{url.port}"
database = url.database

yield ydb.DriverConfig(
endpoint=endpoint,
database=database,
table_client_settings=table_client_settings,
query_client_settings=query_client_settings,
)

def test_ydb_credentials_good(self, table_client_settings, driver_config_for_credentials):
def test_ydb_credentials_good(self, query_client_settings, driver_config_for_credentials):
credentials_good = ydb.StaticCredentials(
driver_config=driver_config_for_credentials,
user="root",
Expand All @@ -775,7 +705,7 @@ def test_ydb_credentials_good(self, table_client_settings, driver_config_for_cre
result = conn.execute(sa.text("SELECT 1 as value"))
assert result.fetchone()

def test_ydb_credentials_bad(self, table_client_settings, driver_config_for_credentials):
def test_ydb_credentials_bad(self, query_client_settings, driver_config_for_credentials):
credentials_bad = ydb.StaticCredentials(
driver_config=driver_config_for_credentials,
user="root",
Expand Down
12 changes: 12 additions & 0 deletions test/test_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,18 @@ def test_struct_type_bind_variable(self, connection):

eq_(connection.scalar(stmt, {"struct": {"id": 1}}), 1)

def test_struct_type_bind_variable_text(self, connection):
rs = connection.execute(
sa.text("SELECT :struct.x + :struct.y").bindparams(
sa.bindparam(
key="struct",
type_=ydb_sa_types.StructType({"x": sa.Integer, "y": sa.Integer}),
value={"x": 1, "y": 2}
)
)
)
assert rs.scalar() == 3

def test_from_as_table(self, connection):
table = self.tables.container_types_test

Expand Down
Empty file removed test_dbapi/__init__.py
Empty file.
Loading

0 comments on commit c158bd1

Please sign in to comment.