Skip to content

Commit

Permalink
Merge pull request #54 from tortoise/feature/prepared_statements
Browse files Browse the repository at this point in the history
Insert cleanup
  • Loading branch information
grigi authored Oct 16, 2018
2 parents 621911a + ebfd16d commit d136225
Show file tree
Hide file tree
Showing 17 changed files with 264 additions and 235 deletions.
2 changes: 1 addition & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[settings]
line_length=100
multi_line_output=0
known_third_party=
known_third_party=aiosqlite,ciso8601
not_skip=__init__.py
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Changelog
=========

0.10.9
------
- Uses macros on SQLite driver to minimise syncronisation. ``aiosqlite>=0.7.0``
- Uses prepared statements for insert, large insert performance increase.
- Pre-generate base pypika query object per model, providing general purpose speedup.

0.10.8
------
- Performance fixes from ``pypika>=0.15.6``
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __getattr__(cls, name):


project = 'Tortoise'
copyright = '2018, Andrey Bondar'
copyright = '2018, Andrey Bondar' # pylint: disable=W0622
author = 'Andrey Bondar'


Expand Down
31 changes: 16 additions & 15 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
aenum==2.0.8 # via pypika
aiocontextvars==0.1.2 ; python_version < "3.7"
aiomysql==0.0.19
aiosqlite==0.6.0
alabaster==0.7.11 # via sphinx
aiosqlite==0.7.0
alabaster==0.7.12 # via sphinx
asn1crypto==0.24.0 # via cryptography
astroid==2.0.4 # via pylint
asyncpg==0.17.0
Expand All @@ -18,20 +18,21 @@ bandit==1.5.1
certifi==2018.8.24 # via requests
cffi==1.11.5 # via cryptography
chardet==3.0.4 # via requests
ciso8601==2.0.1
ciso8601==2.1.1
click==7.0 # via pip-tools
cloud-sptheme==1.9.4
colorama==0.3.9 # via green
colorama==0.4.0 # via green
coverage==4.5.1 # via coveralls, green
coveralls==1.5.0
coveralls==1.5.1
cryptography==2.3.1 # via pymysql
docopt==0.6.2 # via coveralls
docutils==0.14
filelock==3.0.9 # via tox
flake8-isort==2.5
flake8==3.5.0 # via flake8-isort
gitdb2==2.0.4 # via gitpython
gitdb2==2.0.5 # via gitpython
gitpython==2.1.11 # via bandit
green==2.12.1
green==2.13.0
idna==2.7 # via cryptography, requests
imagesize==1.1.0 # via sphinx
isort==4.3.4 # via flake8-isort, pylint
Expand All @@ -42,32 +43,32 @@ mccabe==0.6.1 # via flake8, pylint
mypy-extensions==0.4.1 # via mypy
mypy==0.630
packaging==18.0 # via sphinx
pbr==4.2.0 # via stevedore
pip-tools==3.0.0
pbr==4.3.0 # via stevedore
pip-tools==3.1.0
pluggy==0.7.1 # via tox
py==1.6.0 # via tox
py==1.7.0 # via tox
pycodestyle==2.3.1 # via flake8
pycparser==2.19 # via cffi
pyflakes==1.6.0 # via flake8
pygments==2.2.0
pylint==2.1.1
pymysql==0.9.2 # via aiomysql
pyparsing==2.2.1 # via packaging
pypika==0.15.6
pyparsing==2.2.2 # via packaging
pypika==0.15.7
pytz==2018.5 # via babel
pyyaml==3.13 # via bandit
requests==2.19.1 # via coveralls, sphinx
six==1.11.0 # via astroid, bandit, cryptography, packaging, pip-tools, sphinx, stevedore, tox
smmap2==2.0.4 # via gitdb2
smmap2==2.0.5 # via gitdb2
snowballstemmer==1.2.1 # via sphinx
sphinx-autodoc-typehints==1.3.0
sphinx==1.8.1
sphinxcontrib-websupport==1.1.0 # via sphinx
stevedore==1.29.0 # via bandit
termstyle==0.1.11 # via green
testfixtures==6.3.0 # via flake8-isort
toml==0.9.6 # via tox
tox==3.4.0
toml==0.10.0 # via tox
tox==3.5.2
typed-ast==1.1.0 # via astroid, mypy
unidecode==1.0.22 # via green
urllib3==1.23 # via requests
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pypika>=0.15.6,<1.0
ciso8601>=2.0
aiocontextvars==0.1.2;python_version<"3.7"
aiosqlite>=0.6.0
aiosqlite>=0.7.0
10 changes: 9 additions & 1 deletion tortoise/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ def _get_config_from_config_file(cls, config_file):
)
return config

@classmethod
def _build_initial_querysets(cls):
for app in cls.apps.values():
for model in app.values():
model._meta.basequery = model._meta.db.query_class.from_(model._meta.table)

@classmethod
async def init(
cls,
Expand Down Expand Up @@ -303,6 +309,8 @@ async def init(

cls._init_relations()

cls._build_initial_querysets()

cls._inited = True

@classmethod
Expand Down Expand Up @@ -374,4 +382,4 @@ async def do_stuff():
loop.run_until_complete(Tortoise.close_connections())


__version__ = "0.10.8"
__version__ = "0.10.9"
85 changes: 50 additions & 35 deletions tortoise/backends/asyncpg/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
from functools import wraps
from typing import List, SupportsInt, Optional # noqa

import asyncpg
from pypika import PostgreSQLQuery
Expand All @@ -12,13 +14,26 @@
from tortoise.transactions import current_transaction_map


def translate_exceptions(func):
@wraps(func)
async def wrapped(self, query, *args):
try:
return await func(self, query, *args)
except asyncpg.exceptions.SyntaxOrAccessError as exc:
raise OperationalError(exc)
except asyncpg.exceptions.IntegrityConstraintViolationError as exc:
raise IntegrityError(exc)
return wrapped


class AsyncpgDBClient(BaseDBAsyncClient):
DSN_TEMPLATE = 'postgres://{user}:{password}@{host}:{port}/{database}'
query_class = PostgreSQLQuery
executor_class = AsyncpgExecutor
schema_generator = AsyncpgSchemaGenerator

def __init__(self, user, password, database, host, port, **kwargs):
def __init__(self, user: str, password: str, database: str, host: str, port: SupportsInt,
**kwargs) -> None:
super().__init__(**kwargs)

self.user = user
Expand All @@ -34,14 +49,14 @@ def __init__(self, user, password, database, host, port, **kwargs):
port=self.port,
database=self.database
)
self._db_pool = None
self._connection = None
self._db_pool = None # Type: Optional[asyncpg.pool.Pool]
self._connection = None # Type: Optional[asyncpg.Connection]

self._transaction_class = type(
'TransactionWrapper', (TransactionWrapper, self.__class__), {}
)

async def create_connection(self):
async def create_connection(self) -> None:
try:
if not self.single_connection:
self._db_pool = await asyncpg.create_pool(self.dsn)
Expand All @@ -56,13 +71,13 @@ async def create_connection(self):
self.database
))

async def close(self):
if not self.single_connection:
async def close(self) -> None:
if self._db_pool:
await self._db_pool.close()
else:
if self._connection:
await self._connection.close()

async def db_create(self):
async def db_create(self) -> None:
single_connection = self.single_connection
self.single_connection = True
self._connection = await asyncpg.connect(self.DSN_TEMPLATE.format(
Expand All @@ -75,10 +90,10 @@ async def db_create(self):
await self.execute_script(
'CREATE DATABASE "{}" OWNER "{}"'.format(self.database, self.user)
)
await self._connection.close()
await self._connection.close() # type: ignore
self.single_connection = single_connection

async def db_delete(self):
async def db_delete(self) -> None:
single_connection = self.single_connection
self.single_connection = True
self._connection = await asyncpg.connect(self.DSN_TEMPLATE.format(
Expand All @@ -92,30 +107,40 @@ async def db_delete(self):
await self.execute_script('DROP DATABASE "{}"'.format(self.database))
except asyncpg.InvalidCatalogNameError:
pass
await self._connection.close()
await self._connection.close() # type: ignore
self.single_connection = single_connection

def acquire_connection(self):
def acquire_connection(self) -> ConnectionWrapper:
if not self.single_connection:
return self._db_pool.acquire()
return self._db_pool.acquire() # type: ignore
else:
return ConnectionWrapper(self._connection)

def _in_transaction(self):
def _in_transaction(self) -> 'TransactionWrapper':
if self.single_connection:
return self._transaction_class(self.connection_name, connection=self._connection)
else:
return self._transaction_class(self.connection_name, pool=self._db_pool)

async def execute_query(self, query, get_inserted_id=False):
try:
async with self.acquire_connection() as connection:
self.log.debug(query)
return await connection.fetch(query)
except asyncpg.exceptions.SyntaxOrAccessError as exc:
raise OperationalError(exc)
except asyncpg.exceptions.IntegrityConstraintViolationError as exc:
raise IntegrityError(exc)
@translate_exceptions
async def execute_insert(self, query: str, values: list) -> int:
self.log.debug('%s: %s', query, values)
async with self.acquire_connection() as connection:
# TODO: Cache prepared statement
stmt = await connection.prepare(query)
return await stmt.fetchval(*values)

@translate_exceptions
async def execute_query(self, query: str) -> List[dict]:
self.log.debug(query)
async with self.acquire_connection() as connection:
return await connection.fetch(query)

@translate_exceptions
async def execute_script(self, query: str) -> None:
self.log.debug(query)
async with self.acquire_connection() as connection:
await connection.execute(query)

async def get_single_connection(self):
if self.single_connection:
Expand All @@ -128,19 +153,9 @@ async def release_single_connection(self, single_connection):
if not self.single_connection:
await self._db_pool.release(single_connection.connection)

async def execute_script(self, script):
try:
async with self.acquire_connection() as connection:
self.log.debug(script)
await connection.execute(script)
except asyncpg.exceptions.SyntaxOrAccessError as exc:
raise OperationalError(exc)
except asyncpg.exceptions.IntegrityConstraintViolationError as exc:
raise IntegrityError(exc)


class TransactionWrapper(AsyncpgDBClient, BaseTransactionWrapper):
def __init__(self, connection_name, pool=None, connection=None):
def __init__(self, connection_name: str, pool=None, connection=None) -> None:
if pool and connection:
raise ConfigurationError('You must pass either connection or pool')
self._connection = connection
Expand All @@ -155,7 +170,7 @@ def __init__(self, connection_name, pool=None, connection=None):
self.connection_name = connection_name
self.transaction = None

def acquire_connection(self):
def acquire_connection(self) -> ConnectionWrapper:
return ConnectionWrapper(self._connection)

async def _get_connection(self):
Expand Down
22 changes: 6 additions & 16 deletions tortoise/backends/asyncpg/executor.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,13 @@
from typing import List

from pypika import Table

from tortoise.backends.base.executor import BaseExecutor


class AsyncpgExecutor(BaseExecutor):
async def execute_insert(self, instance):
self.connection = await self.db.get_single_connection()
regular_columns = self._prepare_insert_columns()
columns, values = self._prepare_insert_values(
instance=instance,
regular_columns=regular_columns,
)

query = (
def _prepare_insert_statement(self, columns: List[str]) -> str:
return str(
self.connection.query_class.into(Table(self.model._meta.table)).columns(*columns)
.insert(*values).returning('id')
)
result = await self.connection.execute_query(str(query))
instance.id = result[0][0]
await self.db.release_single_connection(self.connection)
self.connection = None
return instance
.insert('???').returning('id')
).replace("'???'", ','.join(['$%d' % (i + 1, ) for i in range(len(columns))]))
Loading

0 comments on commit d136225

Please sign in to comment.