Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add table settings support #22

Merged
merged 7 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
license="Apache 2.0",
package_dir={"": "."},
long_description=long_description,
long_description_content_type='text/markdown',
long_description_content_type="text/markdown",
packages=setuptools.find_packages("."),
classifiers=[
"Programming Language :: Python",
Expand All @@ -32,11 +32,14 @@
install_requires=requirements, # requirements.txt
options={"bdist_wheel": {"universal": True}},
extras_require={
"yc": ["yandexcloud", ],
"yc": [
"yandexcloud",
],
},
entry_points={
"sqlalchemy.dialects": [
"yql.ydb=ydb_sqlalchemy.sqlalchemy:YqlDialect",
"ydb=ydb_sqlalchemy.sqlalchemy:YqlDialect",
]
},
)
1 change: 1 addition & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from sqlalchemy.dialects import registry

registry.register("yql.ydb", "ydb_sqlalchemy.sqlalchemy", "YqlDialect")
registry.register("ydb", "ydb_sqlalchemy.sqlalchemy", "YqlDialect")
pytest.register_assert_rewrite("sqlalchemy.testing.assertions")

from sqlalchemy.testing.plugin.pytestplugin import * # noqa: E402, F401, F403
171 changes: 169 additions & 2 deletions test/test_core.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from decimal import Decimal
from datetime import date, datetime

import pytest
import sqlalchemy as sa
from sqlalchemy import Table, Column, Integer, Unicode

from sqlalchemy.testing.fixtures import TestBase, TablesTest

from datetime import date, datetime
import ydb
from ydb._grpc.v4.protos import ydb_common_pb2

from ydb_sqlalchemy.sqlalchemy import types


def clear_sql(stm):
Expand Down Expand Up @@ -200,3 +204,166 @@ def test_select_types(self, connection):

row = connection.execute(sa.select(tb)).fetchone()
assert row == (1, "Hello World!", 3.5, True, now, today)


class TestWithClause(TablesTest):
run_create_tables = "each"

@staticmethod
def _create_table_and_get_desc(connection, metadata, **kwargs):
table = Table(
"clause_with_test",
metadata,
Column("id", types.UInt32, primary_key=True),
**kwargs,
)
table.create(connection)

session: ydb.Session = connection.connection.driver_connection.pool.acquire()
Copy link
Contributor

@LuckySting LuckySting Jan 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: We have to return this session to the pool at the end, but it's not crucial in tests I think

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be problem in tests too - pool has limited sessions (default 100), if the limit will be reached - code can't get new session.

It is not big problem in one place, but it will be problem in loops or if many tests will not return the session.
especially if reuse connection (and session pool) between tests.

It is better to fix it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added calling of delete() method for session

table_description = session.describe_table("/local/" + table.name)
session.delete()
return table_description

@pytest.mark.parametrize(
"auto_partitioning_by_size,res",
[
(None, ydb_common_pb2.FeatureFlag.Status.ENABLED),
(True, ydb_common_pb2.FeatureFlag.Status.ENABLED),
(False, ydb_common_pb2.FeatureFlag.Status.DISABLED),
],
)
def test_auto_partitioning_by_size(self, connection, auto_partitioning_by_size, res, metadata):
desc = self._create_table_and_get_desc(
connection, metadata, ydb_auto_partitioning_by_size=auto_partitioning_by_size
)
assert desc.partitioning_settings.partitioning_by_size == res

@pytest.mark.parametrize(
"auto_partitioning_by_load,res",
[
(None, ydb_common_pb2.FeatureFlag.Status.DISABLED),
(True, ydb_common_pb2.FeatureFlag.Status.ENABLED),
(False, ydb_common_pb2.FeatureFlag.Status.DISABLED),
],
)
def test_auto_partitioning_by_load(self, connection, auto_partitioning_by_load, res, metadata):
desc = self._create_table_and_get_desc(
connection,
metadata,
ydb_auto_partitioning_by_load=auto_partitioning_by_load,
)
assert desc.partitioning_settings.partitioning_by_load == res

@pytest.mark.parametrize(
"auto_partitioning_partition_size_mb,res",
[
(None, 2048),
(2000, 2000),
],
)
def test_auto_partitioning_partition_size_mb(self, connection, auto_partitioning_partition_size_mb, res, metadata):
desc = self._create_table_and_get_desc(
connection,
metadata,
ydb_auto_partitioning_partition_size_mb=auto_partitioning_partition_size_mb,
)
assert desc.partitioning_settings.partition_size_mb == res

@pytest.mark.parametrize(
"auto_partitioning_min_partitions_count,res",
[
(None, 1),
(10, 10),
],
)
def test_auto_partitioning_min_partitions_count(
self,
connection,
auto_partitioning_min_partitions_count,
res,
metadata,
):
desc = self._create_table_and_get_desc(
connection,
metadata,
ydb_auto_partitioning_min_partitions_count=auto_partitioning_min_partitions_count,
)
assert desc.partitioning_settings.min_partitions_count == res

@pytest.mark.parametrize(
"auto_partitioning_max_partitions_count,res",
[
(None, 0),
(10, 10),
],
)
def test_auto_partitioning_max_partitions_count(
self,
connection,
auto_partitioning_max_partitions_count,
res,
metadata,
):
desc = self._create_table_and_get_desc(
connection,
metadata,
ydb_auto_partitioning_max_partitions_count=auto_partitioning_max_partitions_count,
)
assert desc.partitioning_settings.max_partitions_count == res

@pytest.mark.parametrize(
"uniform_partitions,res",
[
(None, 1),
(10, 10),
],
)
def test_uniform_partitions(
self,
connection,
uniform_partitions,
res,
metadata,
):
desc = self._create_table_and_get_desc(
connection,
metadata,
ydb_uniform_partitions=uniform_partitions,
)
# it not only do the initiation partition but also set up the minimum partition count
assert desc.partitioning_settings.min_partitions_count == res

@pytest.mark.parametrize(
"partition_at_keys,res",
[
(None, 1),
((100, 1000), 3),
],
)
def test_partition_at_keys(
self,
connection,
partition_at_keys,
res,
metadata,
):
desc = self._create_table_and_get_desc(
connection,
metadata,
ydb_partition_at_keys=partition_at_keys,
)
assert desc.partitioning_settings.min_partitions_count == res

def test_several_keys(self, connection, metadata):
desc = self._create_table_and_get_desc(
connection,
metadata,
ydb_auto_partitioning_by_size=True,
ydb_auto_partitioning_by_load=True,
ydb_auto_partitioning_min_partitions_count=3,
ydb_auto_partitioning_max_partitions_count=5,
)
assert desc.partitioning_settings.partitioning_by_size == 1
assert desc.partitioning_settings.partitioning_by_load == 1
assert desc.partitioning_settings.min_partitions_count == 3
assert desc.partitioning_settings.max_partitions_count == 5
1 change: 1 addition & 0 deletions wait_container_ready.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import ydb
import time


def wait_container_ready(driver):
driver.wait(timeout=30)

Expand Down
49 changes: 48 additions & 1 deletion ydb_sqlalchemy/sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,39 @@ def get_bind_types(


class YqlDDLCompiler(DDLCompiler):
pass
def post_create_table(self, table: sa.Table) -> str:
ydb_opts = table.dialect_options["ydb"]
with_clause_list = self._render_table_partitioning_settings(ydb_opts)
if with_clause_list:
with_clause_text = ",\n".join(with_clause_list)
return f"\nWITH (\n\t{with_clause_text}\n)"
return ""

def _render_table_partitioning_settings(self, ydb_opts: Dict[str, Any]) -> List[str]:
table_partitioning_settings = []
if ydb_opts["auto_partitioning_by_size"] is not None:
auto_partitioning_by_size = "ENABLED" if ydb_opts["auto_partitioning_by_size"] else "DISABLED"
table_partitioning_settings.append(f"AUTO_PARTITIONING_BY_SIZE = {auto_partitioning_by_size}")
if ydb_opts["auto_partitioning_by_load"] is not None:
auto_partitioning_by_load = "ENABLED" if ydb_opts["auto_partitioning_by_load"] else "DISABLED"
table_partitioning_settings.append(f"AUTO_PARTITIONING_BY_LOAD = {auto_partitioning_by_load}")
if ydb_opts["auto_partitioning_partition_size_mb"] is not None:
table_partitioning_settings.append(
f"AUTO_PARTITIONING_PARTITION_SIZE_MB = {ydb_opts['auto_partitioning_partition_size_mb']}"
)
if ydb_opts["auto_partitioning_min_partitions_count"] is not None:
table_partitioning_settings.append(
f"AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = {ydb_opts['auto_partitioning_min_partitions_count']}"
)
if ydb_opts["auto_partitioning_max_partitions_count"] is not None:
table_partitioning_settings.append(
f"AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = {ydb_opts['auto_partitioning_max_partitions_count']}"
)
if ydb_opts["uniform_partitions"] is not None:
table_partitioning_settings.append(f"UNIFORM_PARTITIONS = {ydb_opts['uniform_partitions']}")
if ydb_opts["partition_at_keys"] is not None:
table_partitioning_settings.append(f"PARTITION_AT_KEYS = {ydb_opts['partition_at_keys']}")
return table_partitioning_settings


def upsert(table):
Expand Down Expand Up @@ -425,6 +457,21 @@ class YqlDialect(StrCompileDialect):
ddl_compiler = YqlDDLCompiler
type_compiler = YqlTypeCompiler

construct_arguments = [
(
sa.schema.Table,
{
"auto_partitioning_by_size": None,
"auto_partitioning_by_load": None,
"auto_partitioning_partition_size_mb": None,
"auto_partitioning_min_partitions_count": None,
"auto_partitioning_max_partitions_count": None,
"uniform_partitions": None,
"partition_at_keys": None,
},
),
]

@classmethod
def import_dbapi(cls: Any):
return dbapi
Expand Down
Loading